How can I observe values ​​without locking with Rx?

I am trying to observe a timer that its handler is longer than the interval. To do this, I want to schedule monitoring of some kind of streaming pool, task pool or something else.

I tried threadpool, taskpool and newthread, and none of them worked. Does anyone know how to do this? Example:

var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread). Subscribe(x => { count++; Thread.Sleep(TimeSpan.FromMilliseconds(1000)); }); Thread.Sleep(TimeSpan.FromSeconds(5)); disposable.Dispose(); if (count > 10 ) { //hurray... } 
+4
source share
3 answers

What you are asking is a bad idea because you will eventually run out of available resources (since thread creation speed> stream finish speed). Instead, why don't you plan on a new item when the previous one is finished?

In your specific example, you need to pass the IScheduler to an Observable.Timer instead of trying to use ObserveOn.

+4
source

Paul is right when he says this is a bad idea. You logically create a situation where actions in the queue can cause system resources. You can even find that it works on your computer, but does not work on the client computer. Available memory, 32- / 64-bit processor, etc. May affect the code.

However, it's easy to change the code so that it does what you want.

At first, however, the Timer method will correctly schedule timer events until the observer finishes before the next scheduled event. If the observer has not finished, the timer will wait. Remember that observed timers are “cold” observed, so for each registered observer there is actually a new observed timer. This is a one-to-one relationship.

This behavior prevents unintentional blowing of the timer by your resources.

So, as you define the code, OnNext is OnNext called every 1000 milliseconds, not every 100.

Now, to allow 100 milliseconds to run the code on a schedule, do the following:

 Observable .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)) .Select(x => Scheduler.NewThread.Schedule(() => { count++; Thread.Sleep(TimeSpan.FromMilliseconds(1000)); })) .Subscribe(x => { }); 

Effectively, this code is an IObservable<IDisposable> , where each one-time action is a scheduled action that lasts 1000 milliseconds.

In my tests, this performed well and increased the score correctly.

I tried to explode my resources and found that the timer setting starts once every millisecond, I quickly got a System.OutOfMemoryException , but found that the code worked if I changed the setting every two milliseconds. However, this allowed the use of more than 500 MB of RAM, while the code worked and created about 500 new threads. Not really nice.

Be careful!

+2
source

If you sincerely, constantly producing values ​​faster than you can consume them, then, as indicated, you are heading for trouble. If you cannot slow down the pace of production, you need to see how to use them faster. Perhaps you want to use multi-threaded observer to use multiple cores?

If you are a multi-user observer, you may need to be careful to handle events out of order. At the same time you will process several notifications, and all bets are disabled regarding what processing is first performed (or first goes into a critical state of the race).

If you don't have every event in the stream to handle, take a look at a couple of different ObserveLatestOn implementations that float around. The topics are discussed here and here .

ObserveLatestOn deletes everything except the last notification that occurs when the observer processes the previous notification. When the observer completes the processing of the previous notification, he will receive the last notification and skip all the notifications that occurred between them.

The advantage of this is that it prevents the buildup of pressure from the manufacturer, which is faster than its consumer. If the consumer is slower due to the load, then it only gets worse, processing more notifications. Removing unnecessary notifications can allow the load to recede to the point where the consumer can keep up.

+1
source

Source: https://habr.com/ru/post/1014984/


All Articles