Use Observable.FromEventPattern to Perform Actions After Inaction or Counting

I have an observable stream created from an event template, as shown below.

var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
            h => keyspaceMonitor.KeySpaceChanged += h,
            h => keyspaceMonitor.KeySpaceChanged -= h);

What I want to do is subscribe to the thread and execute the method when there were either 10 seconds of inactivity (no events happened), or 100 events were not triggered without the method executing. This is done in order to avoid scenarios when events are fired every 5 seconds and the onNext method is never called.

How can i do this? I know how to make the first part (see below), but I can’t figure out how to do the counting logic. Please note that I already know how to subscribe to the stream.

var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));

Any help would be greatly appreciated! Thanks.

+4
1

Buffer bufferClosingSelector. , maxDuration, maxCount , , . , , .

var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
    var reachedMaxDuration = o
        .Select(_ => Observable.Timer(maxDuration, scheduler))
        .Switch();
    return o.Buffer(() => o
        .TakeUntil(reachedMaxDuration)
        .Take(maxCount)
        .LastOrDefaultAsync());
});

, IScheduler scheduler. throttledStream IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>.

+4

Source: https://habr.com/ru/post/1605616/


All Articles