I am trying to do POC on numerous independent data feeds. Sort a classic observer style app. The number of data transmission channels can vary from several hundred to several thousand, and the number of observers can vary somewhat from 2 to 20,000. Here is a brief example of a simple observable data mannequin:
public class FeedMockUp { private readonly IScheduler observerScheduler; private readonly Random rnd = new Random((int)DateTime.Now.Ticks); private readonly Subject<double> sourceObservable; private readonly IObservable<double> testedObservable; public FeedMockUp(IScheduler observerScheduler) { this.observerScheduler = observerScheduler; sourceObservable = new Subject<double>(); testedObservable = Observable.Create<double>(x => { var underlyingSourceDisposable = sourceObservable .Subscribe(_ => x.OnNext(rnd.NextDouble())); return underlyingSourceDisposable; }); } public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers) { int counter = 0; var disposable = new CompositeDisposable(); for (int i = 0; i < numberOfSubscribers; i++) { disposable.Add(testedObservable .ObserveOn(observerScheduler) .Subscribe(_ => Interlocked.Increment(ref counter))); } return disposable; } public void PushNewFeed() { sourceObservable.OnNext(rnd.NextDouble()); } }
While I was playing with dispatchers to improve the bandwidth for updating the observables, I noticed that when using EventLoopScheduler memory consumption of an application having 100 data feeds with 1000 observers was quite constant, for 1000 observers it was ~ 100Mb and grows linearly with the addition of new observers in the mix.
However, when I tried to use TaskPoolScheduler, in the x86 process, I started to receive OutOfMemoryException exceptions, and in 64-bit processor memory the expense exploded, or rather became undefined in the range from 1 GB to 2 GB in just 500 observers and almost exponentially with new observers in the mixture.
Here is the code I used for testing. Do you see what's wrong with him? Why is there such a difference in performance? Guess there is some kind of internal copying / priority here, but this is just my guess. Ideally, I would like to know what happens here without diving into the RX code base ...
private static void Main(string[] args) { const int displayItemCount = 100; const int callbackCount = 500;
source share