How to implement timeout buffering in RX

I need to implement event processing that is delayed when there are no new events arriving for a specific period. (I had to queue the parsing task when changing the text buffer, but I don't want to start parsing when the user is still typing.)

I'm new to RX, but as far as I can see, I will need a combination of BufferWithTime and Timeout methods. I believe this works like this: it buffers events until they are received regularly for a certain period of time between subsequent events. If there is a space in the event stream (longer than time), it should return events that have been buffered so far.

Having a look at how Buffer and Timeout are executed, I could probably implement my BufferWithTimeout method (if everyone has one, please share with me), but I wonder if this can be achieved simply by combining the existing methods. Any ideas?

+4
source share
4 answers

I think BufferWithTime is what you need.

There is nothing built-in, but something like this should work:

Note. If an error occurs from the source, the buffer will not be flushed. This corresponds to the current (or current last check) functionality of BufferWith*

 public static IObservable<TSource[]> BufferWithTimeout<TSource>( this IObservable<TSource> source, TimeSpan timeout) { return source.BufferWithTimeout(timeout, Scheduler.TaskPool); } public static IObservable<TSource[]> BufferWithTimeout<TSource>( this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler) { return Observable.CreateWithDisposable<TSource[]>(observer => { object lockObject = new object(); List<TSource> buffer = new List<TSource>(); MutableDisposable timeoutDisposable = new MutableDisposable(); Action flushBuffer = () => { TSource[] values; lock(lockObject) { values = buffer.ToArray(); buffer.Clear(); } observer.OnNext(values); }; var sourceSubscription = source.Subscribe( value => { lock(lockObject) { buffer.Add(value); } timeoutDisposable.Disposable = scheduler.Schedule(flushBuffer, timeout); }, observer.OnError, () => { flushBuffer(); observer.OnCompleted(); }); return new CompositeDisposable(sourceSubscription, timeoutDisposable); }); } 
+3
source

This is a rather old question, but I think the next answer is worth mentioning, since all other solutions force the user to sign up manually, track changes, etc.

I propose the following as a "Rx-y" solution.

 var buffers = source .GroupByUntil( // yes. yes. all items belong to the same group. x => true, g => Observable.Amb<int>( // close the group after 5 seconds of inactivity g.Throttle(TimeSpan.FromSeconds(5)), // close the group after 10 items g.Skip(9) )) // Turn those groups into buffers .SelectMany(x => x.ToArray()); 

Basically, the source is finished to some observable, defined in terms of the newest window. A new window (grouped observable) is created, and we use this window to determine when the window should close. In this case, I close the window after 5 seconds of inactivity or a maximum length of 10 (9 + 1).

+4
source

In addition to Richard Salay's answer, I just learned the new Window statement from the latest version of rx. This "solves" the problem, since you can "buffer with a timeout", i.e. Retrieve the result for a time that lasts until a timeout is reached, but instead of retrieving the results as IEnumerable, you actually get them as IObservable.

Here is a brief example of what I mean:

 private void SetupStream() { var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>( h => new MouseButtonEventHandler(h), h => MouseDown += h, h => MouseDown -= h); var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher)) .Switch(); inputStream.Window(() => timeout) .Subscribe(OnWindowOpen); } private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window) { Trace.WriteLine(string.Format("Window open")); var buffer = new List<IEvent<MouseButtonEventArgs>>(); window.Subscribe(click => { Trace.WriteLine(string.Format("Click")); buffer.Add(click); }, () => ProcessEvents(buffer)); } private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks) { Trace.WriteLine(string.Format("Window closed")); //... } 

Each time a window opens, you get all the events as they arrive, save them in the buffer and process them when the window finishes (which actually happens when you open the next window).

I'm not sure that Richard will change his example to use Window now available, but thought it might be worth the alternative.

+2
source

If you just need to start the operation when the user stops printing for a certain amount of time and does not necessarily need intermediate events, then Throttle is the operation you perform. Check here for an example of its use in this scenario.

+1
source

Source: https://habr.com/ru/post/1335047/


All Articles