Concurrent Event Subscriber .net 4.5

I am trying to create a parallel event subscriber. This is my first attempt:

using System; using System.Collections.Generic; using System.Threading.Tasks; using EventStore.ClientAPI; namespace Sandbox { public class SomeEventSubscriber { private Position? _latestPosition; private readonly Dictionary<Type, Action<object>> _eventHandlerMapping; private IEventStoreConnection _connection; public Dictionary<Type, Action<object>> EventHandlerMapping { get { return _eventHandlerMapping; } } public SomeEventSubscriber() { _eventHandlerMapping = CreateEventHandlerMapping(); _latestPosition = Position.Start; } public void Start() { ConnectToEventstore(); } private void ConnectToEventstore() { _connection = EventStoreConnectionWrapper.Connect(); _connection.Connected += (sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped); } private Dictionary<Type, Action<object>> CreateEventHandlerMapping() { return new Dictionary<Type, Action<object>> { {typeof (FakeEvent1), o => Handle(o as FakeEvent1)}, {typeof (FakeEvent2), o => Handle(o as FakeEvent2)}, }; } private async Task Handle(FakeEvent1 eventToHandle) { SomethingLongRunning(eventToHandle); } private async Task Handle(FakeEvent2 eventToHandle) { SomethingLongRunning(eventToHandle); } private async Task SomethingLongRunning(BaseFakeEvent eventToHandle) { Console.WriteLine("Start Handling: " + eventToHandle.GetType()); var task = Task.Delay(10000); await task; Console.WriteLine("Finished Handling: " + eventToHandle.GetType()); } private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent) { if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$")) return; var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent); if (@event != null) { var eventType = @event.GetType(); if (_eventHandlerMapping.ContainsKey(eventType)) { var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event)); Console.WriteLine("The task is running asynchronously..."); } } if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value; } private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex) { if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow) { //TODO: Wait and reconnect probably with back off } if (dropReason == SubscriptionDropReason.UserInitiated) return; if (SubscriptionDropMayBeRecoverable(dropReason)) { Start(); } } private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason) { return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError || dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed; } private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription) { } } } 

In your expert opinion, is this a valid approach? Could you suggest any improvements?

PS:

May be:

 Task.Run(() => _eventHandlerMapping[eventType](@event)); 

would be better?

+6
source share
2 answers

You have one EventOccured delegate in which you will be notified of all events occurring in the EventStore
First, consider running the pre-code inside EventOccured in a different dispatcher than the one in which the events are EventOccured .
Secondly, is it possible to change this to an abstract class with an implementation for FakeEventBase , and then extend it and create separate instances for each type of FakeEvent . It will be a much cleaner solution.
Third, consider creating a ThreadScheduler for a queue and performing these Handle tasks. http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx

EDIT:
I would have a broadcaster class, as shown below, that knows when the operation is complete, and adds the finished event.

 public class EventBroadcaster { public event EventHandler SomeEventOccured; public async void DoLongRunningOperationAndRaiseFinishedEvent() { var waitingTask = Task.Delay(TimeSpan.FromSeconds(2)); await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current); } private void RaiseSomeEventOccured() { EventHandler handler = SomeEventOccured; if (handler != null) handler(this, EventArgs.Empty); } } 

and then EventListener

 public class EventListner { private readonly string _id; public EventListner(string id) { _id = id; } public void ListenTo(EventBroadcaster broadcaster) { broadcaster.SomeEventOccured += OnSomeEventOccured; } private async void OnSomeEventOccured(object sender, EventArgs eventArgs) { var currentTime = DateTime.Now; Console.WriteLine("EventListner {0} received at {1}", _id, currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff")); //Not required just to show this does not affect other instances. //await Task.Delay(TimeSpan.FromSeconds(5)); } } 

then it will be Program.cs for testing

 public static class Program { public static void Main(string[] args) { var broadcaster = new EventBroadcaster(); var listners = new List<EventListner>(); for (int i = 1; i < 10; i++) { var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture)); listner.ListenTo(broadcaster); listners.Add(listner); } broadcaster.DoLongRunningOperationAndRaiseFinishedEvent(); Console.WriteLine("Waiting for operation to complete"); Console.ReadLine(); } } 

In this example, handler delegates are run one after another in the order in which they are signed.

Now change the code in Broadcaster to something like below Note. I changed the method signature from EventHandler to Action for coding convenience.

  private void RaiseSomeEventOccured() { Action handler = SomeEventOccured; if (handler != null) { var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction)); handler(); } } private Action ConvertToAction(Delegate del) { return (Action)del; } 

Now you will see that events are triggered randomly.
I got better performance using option 1.
Note: always with TPL and Parallel programming, you need to make sure that there is an advantage before going for it.

+1
source

I really donโ€™t see the point of creating a parallel events subscriber (if I understand your intent correctly), so that I can run event handlers in parallel, and not one after the other, as with ordinary events).

It is much clearer to express an intention to work in parallel if the event handler shows this. .

Something like (very primitive).

 void SomeEventHandler(object sender, EventArgs e) { Task.Run(() => { ... // some code to run in parallel }); } 

You might want to create your own manager (to be honest, I donโ€™t know how to take all the kernels, but I donโ€™t think itโ€™s difficult, I just never need to do this), but please continue with the usual events.

+1
source

Source: https://habr.com/ru/post/979904/


All Articles