Zipping with an infinite sequence that is true, then always false

I made an extension method:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
                                          TimeSpan minDelay)
{
    return
        source.TimeInterval()
            .Select(
                (x, i) =>
                    Observable.Return(x.Value)
                        .Delay(i == 0
                            ? TimeSpan.Zero
                            : TimeSpan.FromTicks(
                                  Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
            .Concat();
}

this creates a new observable that only allows elements with minimal time separation.

To remove the initial delay, you need to handle the first element differently.

As you can see, there is a test to see if we are dealing with the first element while testing i == 0. The problem here is that if we process more than int.MaxValueelements, this will not work.

Instead, I thought of the following sequence

var trueThenFalse = Observable.Return(true)
                    .Concat(Observable.Repeat(false))

and fixing it next to my source:

source.TimeInterval().Zip(trueThenFalse, ...

but when transferring this infinite sequence to Zip, we seem to enter a closed loop where it trueThenFalseemits all the elements in one pass (ad infinitum). Failure.

(a bool , ), , .

?

,

var trueThenFalse = Observable.Return(true)
    .Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));

OOME. , trueThenFalse, , , Zip .

+4
2

, , Zip , IObservable IEnumerable.

IObservable IEnumerable, .

, :

private IEnumerable<T> Inf<T>(T item)
{
    for (;;)
    {
        yield return item;
    }
}

IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));

Zip :

var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));

... , .

RateLimiter:

public static IObservable<T> RateLimit<T>(this IObservable<T> source,
                                          TimeSpan minDelay)
{
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
    return
        source.TimeInterval()
            .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
                .Delay(firstTime
                    ? TimeSpan.Zero
                    : TimeSpan.FromTicks(
                        Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))

            .Concat();
}
+3
+1

Source: https://habr.com/ru/post/1661571/


All Articles