ReactiveX emits null or sentinel after timeout

Looking for a clean way to convert an Observable source to emit a single null (or sentinel value) after it hasn't emitted any item for some time.

For example, if the source of the observable emits 1, 2, 3 , then it stops emitting for 10 seconds before emitting 4, 5, 6 I would like the emitted elements to be 1, 2, 3, null, 4, 5, 6 .

The use case is for displaying values ​​in the user interface, where the displayed value should turn into a dash - or N/A if the last emitted value is obsolete / old.

I looked at the timeout statement, but it completes the Observable when a timeout occurs, which is undesirable.

Using RxJava.

+5
source share
2 answers

Based on akarnokd's answer and answer in a similar question , alternative implementation:

Single reference value (according to OP)

If you are looking for a single value indicating the time span between outliers:

 final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final long duration = 100; final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler) .concatWith(Observable.never()) .takeUntil(subject) .repeat(); subject.mergeWith(timeout).subscribe(subscriber); subject.onNext(1, 0); subject.onNext(2, 100); subject.onNext(3, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); subject.onNext(4, 0); subject.onNext(5, 100); subject.onNext(6, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); subscriber.assertNoTerminalEvent(); subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6)); 

Continuous Reference Values

If you want to get values ​​continuously after the source of the observable does not emit for some time:

 final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final long duration = 100; final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler) .map(x -> -1) .takeUntil(subject) .repeat(); subject.mergeWith(timeout).subscribe(subscriber); subject.onNext(1, 0); subject.onNext(2, 100); subject.onNext(3, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); subject.onNext(4, 0); subject.onNext(5, 100); subject.onNext(6, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); subscriber.assertNoTerminalEvent(); subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6)); 

The difference, observed timeout and repeated or not.

You can replace -1 with null as needed.

All of the above is tested using RxJava 1.0.17 using Java 1.8.0_72 .

+2
source

You can achieve this with a rather complicated publish-amb-timer setup:

 PublishSubject<Integer> ps = PublishSubject.create(); TestScheduler s = Schedulers.test(); TestSubscriber<Integer> ts = new TestSubscriber<>(); ps.publish(o -> o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null)) .repeat().takeUntil(o.ignoreElements()) ).subscribe(ts); ps.onNext(1); ps.onNext(2); ps.onNext(3); s.advanceTimeBy(15, TimeUnit.SECONDS); ps.onNext(4); ps.onNext(5); ps.onNext(6); ps.onCompleted(); ts.assertValues(1, 2, 3, null, 4, 5, 6); 

What happens is that the source is published, so you can take items one after another from it or a timer event, make sure the fastest wins and repeats it with the next value, all without re-subscribing to the original source.

Change fixed the case where the upstream completes the repetition (), goes into an infinite loop.

+1
source

Source: https://habr.com/ru/post/1244674/


All Articles