How to stop the interval from the observed

I am new to rxjava, I want to complete the polling task every 2 seconds for 50 times, it can also end if some condition occurs in the task, I try to use it Observable.interval, but I found that there is no way to abort it except throwing an exception Is there any other operator to achieve my goal? BTW this functionality works as an API to provide an observable object, so I can’t manage the subscriber and end the subscription.

Observable.interval(timeout, interval, TimeUnit.SECONDS)
.flatmap(task - > task)
+4
source share
2 answers

I think, Observable.takeUntil(stopPredicate)or Observable.takeWhile(predicate)can help you:

Observable.interval(timeout, interval, TimeUnit.SECONDS) 
.takeWhile(val -> val < 42)

Here the observable ends with the 42nd attempt

+10
source

Observable.interval takeUntil :

Observable.interval(0, 1, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .takeUntil(new Predicate<Long>() {
                @Override
                public boolean test(Long aLong) throws Exception {
                    return aLong == 10;
                }
            })
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "Tick: " + aLong);
                }
            });

10 .

0

Source: https://habr.com/ru/post/1653627/


All Articles