Rx that publishes a value if a specific timeout expires

I have a method that returns observable. This observable should (if everything works correctly) publish a value every second. What I would like to do is post some kind of custom warning value if a certain time has passed without output.

private IObservable<string> GetStatus() { return statusProvider .Subscribe(statusKey) //returns an IObservable<string> .Select(st => st.ToUpper()) .DistinctUntilChanged() .TakeUntil(disposed) .Replay(1) .RefCount(); } 

Is there an easy way for me to change the above, so that if the status update does not appear within 30 seconds, statusProvider publishes “bad”, and then, if an update occurs after that, it will be published as usual and the timer restarts again to 30 seconds?

+4
source share
2 answers

Here is the way. Begins a timer that will give "bad" when it expires. Each time your statusProvider creates a status, the timer gets reset.

 var statusSignal = statusProvider .Subscribe(statusKey) //returns an IObservable<string> .Select(st => st.ToUpper()) .Publish() .RefCount(); // An observable that produces "bad" after a delay and then "hangs indefinately" after that var badTimer = Observable .Return("bad") .Delay(TimeSpan.FromSeconds(30)) .Concat(Observable.Never<string>()); // A repeating badTimer that resets the timer whenever a good signal arrives. // The "indefinite hang" in badTimer prevents this from restarting the timer as soon // as it produces a "bad". Which prevents you from getting a string of "bad" messages // if the statusProvider is silent for many minutes. var badSignal = badTimer.TakeUntil(statusSignal).Repeat(); // listen to both good and bad signals. return Observable .Merge(statusSignal, badSignal) .DistinctUntilChanged() .Replay(1) .RefCount(); 
+2
source

I think the following should work. It uses a Throttle that will wait up to 30 seconds without input before sending anything. You can then combine this with existing source code to get the desired behavior.

 var bad = source .Throttle(TimeSpan.FromSeconds(30)) .Select(_ => "bad"); var merged = source.Merge(bad); 
0
source

Source: https://habr.com/ru/post/1244675/


All Articles