Sync issue in multiple event subscriptions and getting the latest snapshot

What is the best way to fix the sync problem below by increasing the OrderManager? OrderForm should get the latest list of orders and deals and subscribe to these events, while the OrderManager generates an order and trades in another thread.

public class OrderManager { public event EventHandler<OrderEventArgs> OrderAdded; public event EventHandler<OrderEventArgs> OrderUpdated; public event EventHandler<OrderEventArgs> OrderDeleted; public event EventHandler<TradeEventArgs> TradeAdded; public List<Order> Orders { get; private set; } public List<Trade> Trades { get; private set; } ... } public class OrderForm { public OrderForm(OrderManager manager) { manager.OrderAdded += manager_OrderAdded; manager.OrderUpdated += manager_OrderUpdated; manager.OrderDeleted += manager_OrderDeleted; manager.TradeAdded += manager_TradeAdded; Populate(manager.Orders); Populate(manager.Trades); } ... } 

Should I remove the event template and implement it as follows? Any other better way?

 public class OrderListener { public Action<Order> OrderAdded { get; set; } public Action<Order> OrderUpdated { get; set; } public Action<Order> OrderDeleted { get; set; } public Action<Trade> TradeAdded { get; set; } } public class OrderManager { ... List<Order> orders; List<Trade> trades; List<OrderListener> listeners; public IDisposable Subscribe(OrderListener listener) { lock (orderTradeLock) { listeners.Add(listener); orders.ForEach(listener.OrderAdded); trades.ForEach(listener.TradeAdded); // Allow caller to dispose the return object to unsubscribe. return Disposable.Create(() => { lock (orderTradeLock) { listeners.Remove(listener); } }); } } void OnOrderAdded(Order order) { lock (orderTradeLock) { orders.Add(order); listeners.ForEach(x => x.OrderAdded(order)); } } void OnTradeAdded(Trade trade) { lock (orderTradeLock) { trades.Add(trade); listeners.ForEach(x => x.TradeAdded(trade)); } } ... } public class OrderForm { IDisposable subscriptionToken; public OrderForm(OrderManager manager) { subscriptionToken = manager.Subscribe(new OrderListener { OrderAdded = manager_OrderAdded; OrderUpdated = manager_OrderUpdated; OrderDeleted = manager_OrderDeleted; TradeAdded = manager_TradeAdded; } } ... } 
+5
source share
1 answer

There are several variations of the scenario you were talking about. I will try to get through them:

  • The best way to solve your problem:

Avoid using events in a parallel scenario if you are considering a multithreaded situation, there is no ideal solution for a .NET event.

Especially if you have a high level of concurrency among consumers / subscribers (a large number of additions / deletions of delegate streams), you can check additional information in this article: http://www.codeproject.com/Articles/37474/Threadsafe-Events

By the time I wrote the article a few days ago, you have everything you need to know about the event in .NET. In addition, some solutions to problems / limitations, how to be simple and how to write good code: http://www.codeproject.com/Articles/864690/Simplifying-Events-in-NET

  1. Introducing the synchronization mechanism

You just need to use the same synchronization mechanism for each event or for all of them, maybe ManualResetEventSlim or Semaphore or lock among producers (who will raise this event) and consumers (who will add / remove delegates), you can guarantee that You will not lose information.

The biggest problem with this solution is raising the event. If you place a delegate call inside the synchronization mechanism, there may be a dead end or poor performance, because the responsibility will be focused on what delegate subscribers / consumers will do, which is generally poor design.

  1. Using Reactive Extensions

In this case, with Rx, you can use the same scheduler to subscribe and to raise an event that introduces synchronization, in this part it is important to change the scheduler at runtime to avoid the same problems that I mentioned about using the manual synchronization mechanism. For instance:

 static EventHandler _handler; static EventLoopScheduler _eventLoopScheduler = new EventLoopScheduler(); // synchronization for the consumers/subscribers and producers static ManualResetEventSlim _finish = new ManualResetEventSlim(); // just to check when this test end static int samples = 500; // number of times that will raise the event static int count = 0; // number of times that the event was received by the subscriber static void Main() { var subscription = Observable.FromEventPattern(add => _handler += add, rem => _handler -= rem) // creating event composition .SubscribeOn(_eventLoopScheduler) // used to introduce synchronization when someone call subscribe in the composition //.ObserveOn(Scheduler.Default).Do(arg => DoSomething()) // Used to change the thread to let the _eventScheduler free for new consumers/subscribers .SelectMany(arg => Observable.FromAsync(a => Task.Run(() => DoSomething()))) // Used to change the thread to let the _eventScheduler free.... Used to introduce concurrency in the method execution .Subscribe(); // subscribe to receive the event Parallel.For(0, samples, new ParallelOptions{ MaxDegreeOfParallelism = 4 }, a => { Console.WriteLine(a.ToString("D4") + " | Trying to raise the event in thread:" + Thread.CurrentThread.ManagedThreadId); _eventLoopScheduler.Schedule(() => { Console.WriteLine(a.ToString("D4") + " | Raising event in thread:" + Thread.CurrentThread.ManagedThreadId); _handler(null, EventArgs.Empty); }); }); _finish.Wait(); subscription.Dispose(); _eventLoopScheduler.Dispose(); Console.WriteLine("Completed"); } static void DoSomething() { //var current = count++; // use this code ONLY if you are not introducing concurrency (which is without wrap the execution in another thread) var current = Interlocked.Increment(ref count); // to synchronize the count value, in this case it is necessary if you have the execution in multiple threads (such as using Task.Run or ThreadPool) var random = new Random(current); Console.WriteLine(current.ToString("D4") + " | Doing Something in thread:" + Thread.CurrentThread.ManagedThreadId); Thread.Sleep(random.Next(0,500)); // Simulate some Process if (current == samples) { _finish.Set(); } } 

In the above example, I just used one subscriber, but you will be fine with more than one. I know that sometimes it’s not so easy to understand, because multithreading is a difficult thing, but you can add information or ask questions.

+2
source

Source: https://habr.com/ru/post/1202393/


All Articles