Backpressure handling in Rx.NET without onBackpressureLatest

I need to implement the following algorithm in Rx.NET:

  • Take the last element from stream or wait until a new element appears without blocking if there are no new elements. Only the last item matters, others can be deleted.
  • Enter the element in the SlowFunction and print the output.
  • Repeat step 1.

Naive solution:

 let PrintLatestData (stream: IObservable<_>) = stream.Select(SlowFunction).Subscribe(printfn "%A") 

However, this solution does not work, because on average stream emits elements faster than SlowFunction can destroy them. Because Select does not discard items, but instead tries to process each item in order from oldest to newest, the delay between the emitted and printed item will increase to infinity as the program starts. Only the last last element should be taken from the stream to avoid this infinitely growing backpressure.

I searched the documentation and found a method called onBackpressureLatest in RxJava that, as far as I know, will do what I described above. However, the method does not exist in Rx.NET. How to implement this in Rx.NET?

+6
source share
4 answers

I think you want to use something like ObserveLatestOn . It effectively replaces the incoming event queue with a single value and a flag.

James World posted a blog about it here http://www.zerobugbuild.com/?p=192

The concept is widely used in GUI applications that cannot trust how fast the server can click data on it.

You can also see the implementation in Reactive Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.operativeLeclles64 .com / presentations / # ReactConfLondon2014

To be clear, this is a load shedding algorithm, not a backpressure algorithm.

+7
source

The sync / async suggestion may help a little, but provided that the slow function is always slower than the event flow, making it asynchronous, it can allow you to parallelize processing (taking into account the thread pool) due to (eventually) just ending up with threads or adding more latency when switching contexts. This is not like a solution for me.

I suggest you take a look at the Rxx 'Introspective' public access statements written by Dave Sexton. They can vary the buffer / throttle period that you get last, since the queue is reserved due to the slow consumer. If a slow function suddenly accelerates, it does not buffer at all. If it is slower, it will buffer it more. You will need to check if the "last of" type exists, or simply modify existing ones to suit your needs. For instance. Use a buffer and just grab the last item in the buffer or improve further internal storage of only the very last. Google "Rxx" and you will find it somewhere in Github.

A simpler approach, if the time of the "slow function" is predictable enough, consists in simply throttling the flow by an amount exceeding this time. Obviously, I do not mean the standard rx 'throttle', but one that allows you to use the latest update instead of the old one. There are many solutions to this kind of problem.

+1
source

I had the same question some time ago, and I did not find a built-in operator that does just that. So I wrote my own, which I called Latest . It is not trivial to implement, but found it to be very useful in my current project.

It works as follows: while the observer is busy processing the previous notification (on his own thread, of course), he queues the last up to n notifications (n> = 0) and the OnNext observer as soon as he is idle. So:

  • Latest(0) : watch only items arriving during user downtime
  • Latest(1) : always keep the latest
  • Latest(1000) (for example): Usually handle all the elements, but if something is stuck on the line, skip rather than get an OutOfMemoryException
  • Latest(int.MaxValue) : never miss an element, but a load balance between producer and consumer.

Thus, your code will look like this: stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

The signature is as follows:

 /// <summary> /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. /// </summary> /// <param name="source">The source sequence.</param> /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> /// <remarks> /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. /// </remarks> public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 

The implementation is too large to publish here, but if anyone is interested, I would gladly share it. Let me know.

+1
source

You can sample stream at an interval that you know SlowFunction can process. Here is an example in java:

 TestScheduler ts = new TestScheduler(); Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500); stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println); ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 
 98 198 298 398 498 499 

sample does not cause backpressure and always captures the last value in the stream, so it meets your requirement. In addition, sample will not send the same value twice (as seen from above, since 499 is printed only once).

I think this would be a valid C# / F# solution:

 static IDisposable PrintLatestData<T>(IObservable<T> stream) { return stream.Sample(TimeSpan.FromMilliseconds(100)) .Select(SlowFunction) .Subscribe(Console.WriteLine); } 
 let PrintLatestData (stream: IObservable<_>) = stream.Sample(TimeSpan.FromMilliseconds(100)) .Select(SlowFunction) .Subscribe(printfn "%A") 
+1
source

Source: https://habr.com/ru/post/1014976/


All Articles