Upstream events without blocking and receiving events in the correct order

This required some explanation. There is a working thread that should raise some kind of event:

Task.Run(() =>
{
    for(int i = 0; i < 123456789; i++)
    {
        ... // some job
        OnSomeEvent(i);
    }
});

Upstream events synchronously block the task until all event handlers finish:

void OnSomeEvent(int i) => SomeEvent?.Invoke(this, new SomeEventArgs(i));

Asynchronous event growth will no longer block work (yay!)

void OnSomeEvent(int i) => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i)));

but now there is another problem: events are not received in the correct order:

OnSomeEvent(1);
OnSomeEvent(2);
OnSomeEvent(3);
...

// event handler
SomeEvent += (s, e) => Console.WriteLine(e.I);

// possible output
1
3
2

Question: how to implement asynchronous events that occur in the correct order?

, Dispatcher.InvokeAsync . , - . : 1) 2) , /, ? , , ?

P.S.: ContinueWhith.., . , "--", : ) ; 2) .

P.P.S.: , MCVE . , ..

+2
2

TaskQueue, , , :

public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
    public Task Enqueue(Func<Task> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
}

:

private TaskQueue taskQueue = new TaskQueue();
private void OnSomeEvent(int i) => 
    taskQueue.Enqueue(() => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i))));
+5

ActionBlock TPL Dataflow .

:

queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

:

queue.Post(new SomeEventArgs(value));

, :

queue.Complete();

, , , , , :

queue.Completion.Wait();

, queue.Completion Task, await.

, ( ):

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public class SomeEventArgs : EventArgs
    {
        public SomeEventArgs(int value)
        {
            Value = value;
        }

        public int Value { get; }
    }

    internal class Program
    {
        public delegate void SomeEventHandler(SomeEventArgs e);

        public event SomeEventHandler SomeEvent;

        ActionBlock<SomeEventArgs> queue;

        private void run()
        {
            queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

            // Subscribe to my own event (this just for demonstration purposes!)

            this.SomeEvent += Program_SomeEvent;

            // Raise 100 events.

            for (int i = 0; i < 100; ++i)
            {
                OnSomeEvent(i);
                Console.WriteLine("Raised event " + i);
            }

            Console.WriteLine("Signalling that queue is complete.");
            queue.Complete();

            Console.WriteLine("Waiting for queue to be processed.");
            queue.Completion.Wait();

            Console.WriteLine("Done.");
        }

        private void Program_SomeEvent(SomeEventArgs e)
        {
            Console.WriteLine("Handled " + e.Value);
            Thread.Sleep(1); // Simulate load.
        }

        private void OnSomeEvent(int value)
        {
            queue.Post(new SomeEventArgs(value));
        }

        private static void Main()
        {
            new Program().run();
        }
    }
}
+2

Source: https://habr.com/ru/post/1668542/


All Articles