Observed inconsistent LINQ exceptions

In my quest to write a trader in the IObserver stock market IObserver I came across three errors that mostly arise from the Reactive Extensions library.

I have the following CompanyInfo class:

 public class CompanyInfo { public string Name { get; set; } public double Value { get; set; } } 

And the IObservable<CompanyInfo> is called StockMarket :

 public class StockMarket : IObservable<CompanyInfo> 

My Observer as follows:

 public class StockTrader : IObserver<CompanyInfo> { public void OnCompleted() { Console.WriteLine("Market Closed"); } public void OnError(Exception error) { Console.WriteLine(error); } public void OnNext(CompanyInfo value) { WriteStock(value); } private void WriteStock(CompanyInfo value) { ... } } 

I run the following code:

 StockMarket market = GetStockMarket(); StockTrader trader = new StockTrader(); IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2] .GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4] .SelectMany(x => x //4, 8, 2, 3 .Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3) .SkipLast(1) //(4, 8), (8, 2), (2, 3) .Select(y => new CompanyInfo //(+100%), (-75%), (+50%) { Name = x.Key, Value = (y[1].Value - y[0].Value) / y[0].Value }) //[F, +100%]; [S, -20%] ); using (IDisposable subscription = differential.Subscribe(trader)) { Observable.Wait(market); } 

One of three errors occurs:

  • The following ArgumentException is Reactive Extensions from Reactive Extensions :

    System.ArgumentException: An item with the same key has already been added. in System.ThrowHelper.ThrowArgumentException (ExceptionResource resource) in System.Collections.Generic.Dictionary`2.Insert (TKey key, TValue value, Boolean add) in System.Reactive.Linq.Observable.GroupBy'3._. OnNext (TSource value)

  • The following IndexOutOfRangeException :

    Parameter name: index in System.ThrowHelper.ThrowArgumentOutOfRangeException (argument ExceptionArgument, resource ExceptionResource) in System.Collections.Generic.List'1.get_Item (index Int32) in StockMarketTests. <> c__DisplayClass0_0.b__2 (IList'1 y) in System.Reactive.Linq.Observable.Select'2._. OnNext (TSource value)

  • Console text text sporadically (color should be compatible):

Console

What can cause these bizzare symptoms?

+5
source share
2 answers

One of the greatest things about the Reactive Extensions concept is the ability to subscribe to an “ IObservable ” that happened “somewhere” and apply object-oriented concepts to this “occurrence” - without knowing where it is “somewhere” .

This Reactive Extensions method simplifies event oriented programming and producer-consumer problems a lot .

Great power

The ability to subscribe to IObservable without knowing the source of the observed data forces the subscriber to receive notifications is unpredictable . In other words, when observing IObservable you should assume that notifications can be delivered at the same time .

Due to the behavioral contract of Reactive Externsions , IObservables must produce one item at a time. Usually what happens, but sometimes external implementations do not follow this contract.

Let's look at each of the three issues:

GroupBy not thread safe


GroupBy works by returning IObservable<IGroupedObservable<T>> , its OnNext method calls the external IObservable OnNext using IGroupedObservable<T> , which corresponds to the current notification. he does this by storing one IGroupedObservable<T> (more precisely, one Subject<T> ) for each key inside the Dictionary - which is not surprising - not ConcurrentDictionary . This means that two close notifications can cause double insertion .

Select not the only one


Select streaming security is determined by the delegate. In the above case, the delegate provided by Select relies on Buffer(2, 1) provide a list of size 2. Buffer contains a Queue that is not parallel , so when repeating multiple threads, the Buffer Queue may provide us with some unexpected results .

another Exception that may be thrown for the same reason, a NullReferenceException if y is thrown, or an InvalidOperationException for Queue may be InvalidOperationException while it repeats.

Even basic surveillance is unsafe


And last but not least, even if you perform only the basic observation, the StockTrader OnNext method modifies the console into an atom operation that causes the appearance of the text.

So what can you do?


Threads

There is a Synchronize method, so you can confirm that you are subscribing to a linear IObservable<T> , which means no more than one call to the OnNext method can occur at a time .

Since even the GroupBy extension GroupBy not thread safe, you must call Synchronize at the beginning of the chain:

 IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2] .Synchronize() .GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4] .SelectMany(x => x //4, 8, 2, 3 .Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3) .SkipLast(1) //(4, 8), (8, 2), (2, 3) .Select(y => new CompanyInfo //(+100%), (-75%), (+50%) { Name = x.Key, Value = (y[1].Value - y[0].Value) / y[0].Value }) ); //[F, +100%]; [S, -20%] 

Please note that Synchronize adds another Observable proxy to your request, so it will make the request a little slower, so you should avoid using it when it is not needed .

+6
source

The problem with your code is not with the request, but with Rx as such. The problem is probably related to any of your actual StockMarket or StockTrader .

Now, the problem is probably caused by the fact that you are creating two subscriptions to your observable market .

When you write this:

 using (IDisposable subscription = differential.Subscribe(trader)) { Observable.Wait(market); } 

... you get two subscriptions to the market . One of differential.Subscribe(trader) , and the other because of Observable.Wait(market); .

I suspect that two concurrent subscriptions are causing problems, but without seeing the StockMarket implementation, we cannot say why this throws.

This is the danger of realizing your own observable and observational realizations. You should avoid this. It would be better to have the property IObservable<CompanyInfo> CompanyValues { get; } IObservable<CompanyInfo> CompanyValues { get; } CompanyInfo hangs, which is built using standard Rx operators.

And you should always avoid locking operations such as .Wait(...) .

As a quick test, I would replace your current Observable.Wait(market); on Thread.Sleep(?) with a long enough wait period to find out if your code is working. Of course, you need to make sure that you are creating values ​​in the background scheduler (e.g. Scheduler.Default ).

I checked this code to check your request:

 public class CompanyInfo { public string Name { get; set; } public double Value { get; set; } } public class StockTrader : IObserver<CompanyInfo> { public void OnCompleted() { Console.WriteLine("Market Closed"); } public void OnError(Exception error) { Console.WriteLine(error); } public void OnNext(CompanyInfo value) { WriteStock(value); } private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); } } public class StockMarket : IObservable<CompanyInfo> { private CompanyInfo[] _values = new CompanyInfo[] { new CompanyInfo() { Name = "F", Value = 1 }, new CompanyInfo() { Name = "S", Value = 5 }, new CompanyInfo() { Name = "S", Value = 4 }, new CompanyInfo() { Name = "F", Value = 2 }, }; public IDisposable Subscribe(IObserver<CompanyInfo> observable) { return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable); } } 

... with this:

 StockMarket market = new StockMarket(); StockTrader trader = new StockTrader(); IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2] .GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4] .SelectMany(x => x //4, 8, 2, 3 .Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3) .SkipLast(1) //(4, 8), (8, 2), (2, 3) .Select(y => new CompanyInfo //(+100%), (-75%), (+50%) { Name = x.Key, Value = (y[1].Value - y[0].Value) / y[0].Value }) //[F, +100%]; [S, -20%] ); IDisposable subscription = differential.Subscribe(trader); Thread.Sleep(10000); 

I have never received it for a crash or any exception.

+1
source

Source: https://habr.com/ru/post/1242048/


All Articles