How to process messages asynchronously, discarding new messages during processing?

I have a C # application that subscribes to a topic in our messaging system to update values. When a new value comes in, I do some processing and then continue. The problem is that updates can happen faster than the application can handle them. What I want to do is just keep the last value, so I don't want the queue. For example, the source publishes a value of "1" and my application receives it; during processing, the source publishes the sequence (2, 3, 4, 5) before my application is processed; my application then processes the value "5" and the previous values ​​are thrown away.

It’s hard to publish a sample working code because it is based on proprietary message libraries, but I think it’s a common template, I just can’t understand what it called ... It seems that the processing function should work in a separate thread than the exchange callback messaging, but I'm not sure how to organize it, for example, how this thread is notified of a change in value. Any general tips on what I need to do?

+3
source share
5 answers

A very simple way might be something like this:

private IMessage _next;

public void ReceiveMessage(IMessage message)
{
    Interlocked.Exchange(ref _next, message);
}

public void Process()
{
    IMessage next = Interlocked.Exchange(ref _next, null);

    if (next != null)
    {
        //...
    }
}
+2
source

, . , , , .

+1

"", . , . - ( ).

, : , / . , , . , , , .

, , .

0

, . , , , Threadpool, .

:

Mutlithreaded

Threading Threading

0

- . - . , Stack.Pop(), Stack.Clear():

public static class Incoming
{
    private static object locker = new object();
    private static object lastMessage = null;

    public static object GetMessage()
    {
        lock (locker)
        {
            object tempMessage = lastMessage;
            lastMessage = null;
            return tempMessage;
        }
    }
    public static void SetMessage(object messageArg)
    {
        lock (locker)
        {
            lastMessage = messageArg;
        }
    }

    private static Stack<object> messageStack = new Stack<object>();
    public static object GetMessageStack()
    {
        lock (locker)
        {
            object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null;
            messageStack.Clear();
            return tempMessage;
        }
    }
    public static void SetMessageStack(object messageArg)
    {
        lock (locker)
        {
            messageStack.Push(messageArg);
        }
    }
}

Putting processing functions in a separate thread is a good idea. Either use the callback method from the processing thread to signal that it is ready for another message, or it signals that it is being executed, and then has a main thread that starts a new processor thread when the message is received (via the above SetMessage ...).

0
source

Source: https://habr.com/ru/post/1735065/


All Articles