Custom Rx operator for throttling only when the last value

I am trying to create an Rx statement that seems very useful, but I surprisingly did not find any Stackoverflow questions that exactly match. I would like to create a variation on Throttle that allows you to instantly transmit values ​​if there has been a period of inactivity. My alleged use case looks something like this:

I have a dropdown that triggers a web request when the value changes. If the user holds the arrow key and quickly iterates over the values, I do not want to run a query for each value. But if I suppress the flow, then the user must wait for the throttle duration each time they just select a value from the drop-down list in normal mode.

So, if a normal Throttle looks like this: Normal Throttle ():

I want to create a ThrottleSubsequent that looks like this: ThrottleSubsequent ():

Please note that balls 1, 2 and 6 pass without delay, since each of them follows a period of inactivity.

My attempt is as follows:

 public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) { // Create a timer that resets with each new source value var cooldownTimer = source .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer .Switch(); // Switch to the most recent timer var cooldownWindow = source.Window(() => cooldownTimer); // Pass along the first value of each cooldown window immediately var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1)); // Throttle the rest of the values var throttledRest = cooldownWindow .SelectMany(o => o.Skip(1)) .Throttle(dueTime, scheduler); return Observable.Merge(firstAfterCooldown, throttledRest); } 

This seems to work, but it's hard for me to talk about it, and I feel that there are some edge cases here where everything can be inundated with duplicate values ​​or something else. I would like to get some feedback from more experienced Rx-ers regarding whether this code is correct and / or whether there is a more idiomatic way to do this.

+5
source share
1 answer

Well, here is a test suite (using nuget Microsoft.Reactive.Testing ):

 var ts = new TestScheduler(); var source = ts.CreateHotObservable<char>( new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')), new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')), new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')), new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')), new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G')) ); var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts); var expectedResults = ts.CreateHotObservable<char>( new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')), new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G')) ); var observer = ts.CreateObserver<char>(); target.Subscribe(observer); ts.Start(); ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 

and using

 public static class TestingHelpers { public static long MsTicks(this int i) { return TimeSpan.FromMilliseconds(i).Ticks; } } 

It seems to pass. If you want to reduce it, you can include it in this:

 public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) { return source.Publish(_source => _source .Window(() => _source .Select(x => Observable.Interval(dueTime, scheduler)) .Switch() )) .Publish(cooldownWindow => Observable.Merge( cooldownWindow .SelectMany(o => o.Take(1)), cooldownWindow .SelectMany(o => o.Skip(1)) .Throttle(dueTime, scheduler) ) ); } 

EDIT

Publish forcibly distributes the subscription. If you have a bad (or expensive) source observed with the side effects of a subscription, Publish guarantees that you will only subscribe once. Here is an example where Publish helps:

 void Main() { var source = UglyRange(10); var target = source .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i))) .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent .Subscribe(i => Console.WriteLine(i)); } static int counter = 0; public IObservable<int> UglyRange(int limit) { var uglySource = Observable.Create<int>(o => { if (counter++ == 0) { Console.WriteLine("Ugly observable should only be created once."); Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i)); } else { Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created."); o.OnError(new Exception($"observable invoked {counter} times.")); } return Disposable.Empty; }); return uglySource; } 
+2
source

Source: https://habr.com/ru/post/1265005/


All Articles