How to use DisposableObserver with lambda expressions in RxJava2

My use case is the desire to get rid of after a certain condition in my onNext. Therefore, I am trying to use DisposableObserver. This is the code that works

Observable.just(1, 2, 3, 4)
    .subscribe(new DisposableObserver<Integer>() {
                     @Override
                     public void onNext(Integer integer) {
                       System.out.println("onNext() received: " + integer);
                       if (integer == 2) {
                         dispose();
                       }
                     }
                     @Override
                     public void onError(Throwable e) { System.out.println("onError()"); }
                     @Override
                     public void onComplete() { System.out.println("onComplete()"); }
                   }
    );

Now, if you try to replace this with lambda, it treats lambda as

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

Do it now. By saving the one-time from onSubscribe, and then calling disposable.dispose (); from onNext.

  private Disposable disposable;
  private void disposableObserverTest() {
    Observable.just(1, 2, 3, 4)
        .subscribe(integer -> {
              System.out.println("onNext() received: " + integer);
              if (integer == 2) {
                disposable.dispose();
              }

            }, throwable -> System.out.println("error"),
            () -> System.out.println("complete"),
            disposable1 -> {
              this.disposable = disposable1;
            });
  }

However, if you want to directly call dispose (), how to do it with lambdas?

+4
source share
2 answers

you can use takeUntil to close the observable.

@Test
public void takeUntil() throws Exception {
    Observable.just(1, 2, 3, 4)
            .takeUntil(integer -> integer == 2)
            .test()
            .assertValues(1, 2);
}
+4
source

,

subscribe(DisposableObserver observer)

subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete)

, DisposableObserver, dispose() .

+3

Source: https://habr.com/ru/post/1675041/


All Articles