Based on akarnokd's answer and answer in a similar question , alternative implementation:
Single reference value (according to OP)
If you are looking for a single value indicating the time span between outliers:
final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final long duration = 100; final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler) .concatWith(Observable.never()) .takeUntil(subject) .repeat(); subject.mergeWith(timeout).subscribe(subscriber); subject.onNext(1, 0); subject.onNext(2, 100); subject.onNext(3, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); subject.onNext(4, 0); subject.onNext(5, 100); subject.onNext(6, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); subscriber.assertNoTerminalEvent(); subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
Continuous Reference Values
If you want to get values ​​continuously after the source of the observable does not emit for some time:
final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final long duration = 100; final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler) .map(x -> -1) .takeUntil(subject) .repeat(); subject.mergeWith(timeout).subscribe(subscriber); subject.onNext(1, 0); subject.onNext(2, 100); subject.onNext(3, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); subject.onNext(4, 0); subject.onNext(5, 100); subject.onNext(6, 200); scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); subscriber.assertNoTerminalEvent(); subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
The difference, observed timeout
and repeated or not.
You can replace -1
with null
as needed.
All of the above is tested using RxJava 1.0.17 using Java 1.8.0_72
.