What is the best way to “limit the speed” of consumption observed?

I have a bunch of events and I have to execute ALL of them without loss, but I want to make sure that they are buffered and consumed at the appropriate time intervals. Anyone have a solution?

I can not find any operators in Rx that can do this without losing events (Throttle - loses events). I also considered Buffered, Delay, etc. I can not find a good solution.

I tried to set the timer in the middle, but somehow it doesn't work at all:

GetInitSequence() .IntervalThrottle(TimeSpan.FromSeconds(5)) .Subscribe( item => { Console.WriteLine(DateTime.Now); // Process item } ); public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) { return Observable.Create<T>(o => { return source.Subscribe(x => { new Timer(state => o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); }, o.OnError, o.OnCompleted); }); } 
+6
source share
4 answers

The question is not 100% clear, so I am making some presumptions.

Observable.Delay is not what you want, because it will create a delay from the moment each event arrives, and will not even create time intervals for processing.

Observable.Buffer not what you want, because it will cause all events in each given interval to be transmitted to you, and not one at a time.

So, I believe that you are looking for a solution that creates some kind of metronome that goes out and gives you an event beyond the mark. This may be naive , built using Observable.Interval for the metronome and Zip to connect it to your source:

 var source = GetInitSequence(); var trigger = Observable.Interval(TimeSpan.FromSeconds(5)); var triggeredSource = source.Zip(trigger, (s,_) => s); triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

This will run every 5 seconds (in the above example) and you will get the original elements in sequence.

The only problem with this solution is that if you no longer have the source elements (say) 10 seconds when the source elements arrive, they will be sent immediately, since some of the trigger events sit there waiting for them. Marble diagram for this scenario:

 source: -abc----------------------defg trigger: ----o----o----o----o----o----o----o result: ----a----b----c-------------defg 

This is a very reasonable problem. There are already two questions here:

Rx IObservable Buffering for Smoothing Event Bursts

Method for uniformly shifting buffered events

The provided solution is the main Drain extension method and the Buffered extension. I changed them a lot easier (no need to Drain , just use Concat ). Using:

 var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

StepInterval Extension StepInterval :

 public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) { return source.Select(x => Observable.Empty<T>() .Delay(minDelay) .StartWith(x) ).Concat(); } 
+10
source

I know this may just be too easy, but will it work?

 var intervaled = source.Do(x => { Thread.Sleep(100); }); 

This basically just sets the minimum delay between values. Too simplistic?

+1
source

According to the answers of Enigmativity, if all you want to do is just to delay all the values ​​with TimeSpan, I cannot understand why Delay not the operator you want

  GetInitSequence() .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here .Subscribe( item => { Console.WriteLine(DateTime.Now); // Process item } ); 
+1
source

What about Observable.Buffer ? This should return all events in window 1s as a single event.

 var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

Perhaps this is what you are asking for, not so clearly. What should your code do? It looks like you are just delaying the creation of a timer for each event. It also violates the semantics of the observable, since the next and the full can happen before the next.

Please note that this is also accurate only when using a timer. As a rule, timers have an accuracy of no more than 16 ms.

Edit:

your example will be, and the element contains all the events in the window:

 GetInitSequence() .Buffer(TimeSpan.FromSeconds(5)) .Subscribe( item => { Console.WriteLine(DateTime.Now); // Process item } ); 
0
source

Source: https://habr.com/ru/post/919437/


All Articles