One of the greatest things about the Reactive Extensions concept is the ability to subscribe to an “ IObservable ” that happened “somewhere” and apply object-oriented concepts to this “occurrence” - without knowing where it is “somewhere” .
This Reactive Extensions method simplifies event oriented programming and producer-consumer problems a lot .

The ability to subscribe to IObservable without knowing the source of the observed data forces the subscriber to receive notifications is unpredictable . In other words, when observing IObservable you should assume that notifications can be delivered at the same time .
Due to the behavioral contract of Reactive Externsions , IObservables must produce one item at a time. Usually what happens, but sometimes external implementations do not follow this contract.
Let's look at each of the three issues:
GroupBy not thread safe
GroupBy works by returning IObservable<IGroupedObservable<T>> , its OnNext method calls the external IObservable OnNext using IGroupedObservable<T> , which corresponds to the current notification. he does this by storing one IGroupedObservable<T> (more precisely, one Subject<T> ) for each key inside the Dictionary - which is not surprising - not ConcurrentDictionary . This means that two close notifications can cause double insertion .
Select not the only one
Select streaming security is determined by the delegate. In the above case, the delegate provided by Select relies on Buffer(2, 1) provide a list of size 2. Buffer contains a Queue that is not parallel , so when repeating multiple threads, the Buffer Queue may provide us with some unexpected results .
another Exception that may be thrown for the same reason, a NullReferenceException if y is thrown, or an InvalidOperationException for Queue may be InvalidOperationException while it repeats.
Even basic surveillance is unsafe
And last but not least, even if you perform only the basic observation, the StockTrader OnNext method modifies the console into an atom operation that causes the appearance of the text.
So what can you do?

There is a Synchronize method, so you can confirm that you are subscribing to a linear IObservable<T> , which means no more than one call to the OnNext method can occur at a time .
Since even the GroupBy extension GroupBy not thread safe, you must call Synchronize at the beginning of the chain:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2] .Synchronize() .GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4] .SelectMany(x => x //4, 8, 2, 3 .Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3) .SkipLast(1) //(4, 8), (8, 2), (2, 3) .Select(y => new CompanyInfo //(+100%), (-75%), (+50%) { Name = x.Key, Value = (y[1].Value - y[0].Value) / y[0].Value }) ); //[F, +100%]; [S, -20%]
Please note that Synchronize adds another Observable proxy to your request, so it will make the request a little slower, so you should avoid using it when it is not needed .