I made an extension method:
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
return
source.TimeInterval()
.Select(
(x, i) =>
Observable.Return(x.Value)
.Delay(i == 0
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
.Concat();
}
this creates a new observable that only allows elements with minimal time separation.
To remove the initial delay, you need to handle the first element differently.
As you can see, there is a test to see if we are dealing with the first element while testing i == 0. The problem here is that if we process more than int.MaxValueelements, this will not work.
Instead, I thought of the following sequence
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false))
and fixing it next to my source:
source.TimeInterval().Zip(trueThenFalse, ...
but when transferring this infinite sequence to Zip, we seem to enter a closed loop where it trueThenFalseemits all the elements in one pass (ad infinitum). Failure.
(a bool , ), , .
?
,
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));
OOME. , trueThenFalse, , , Zip .