RxJava: watchOn, subscribeOn and doFinally, switch between IO and UI streams

I ran into a problem when my observable subscribes to the input / output stream and is observed in the stream of the main (UI) android, but the operator doFinallyruns in the input / output stream, and it needs to be run in the user interface stream,

Usekas is almost the same as in the middle article .

I want to show ProgressBarwhen the Observable is signed and will hide ProgressBarwhen the Observable is complete or complete.

The error I get is: java.lang.IllegalStateException: the current thread must have a looper!

Can someone help me move the action doFinallyback to the UI thread that has a looper? Or am I missing any other information?

EDIT usecase workflow:

-> Start up activities

-> initialize

-> execute observable thread

-> Start a new activity and end the current activity.

-> New activity

-> Start original work and complete

-> repeat initialization

Many thanks.

More details:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android sdk min 14 and target 25

Code example

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Stack trace:

FATAL EXCEPTION: RxCachedThreadScheduler-1                                                                                                 : com.example.android.demo.customerfirst.alpha, PID: 16685                                                                                                 java.lang.IllegalStateException: !                                                                                                      android.view.Choreographer $1.initialValue(.: 96)                                                                                                      android.view.Choreographer $1.initialValue( .java:91)                                                                                                      java.lang.ThreadLocal $Values.getAfterMiss(ThreadLocal.java:430)                                                                                                      java.lang.ThreadLocal.get(ThreadLocal.java:65)                                                                                                      android.view.Choreographer.getInstance( .java:192)                                                                                                     at android.animation.ValueAnimator $AnimationHandler. (ValueAnimator.java:600)                                                                                                      android.animation.ValueAnimator $AnimationHandler. (ValueAnimator.java:575)                                                                                                     at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)                                                                                                      android.animation.ValueAnimator.end(ValueAnimator.java:998)                                                                                                      android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)                                                                                                      android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)                                                                                                      android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)                                                                                                     at android.view.View.dispatchVisibilityChanged(View.java:8643)                                                                                                     at android.view.View.setFlags(View.java:9686)                                                                                                     at android.view.View.setVisibility(View.java:6663)                                                                                                      android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)                                                                                                      com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)                                                                                                     at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator $3.run(ProductListPresenterMediator.java:56)                                                                                                      io.reactivex.internal.operators.observable.ObservableDoFinally $DoFinallyObserver.runFinally(ObservableDoFinally.java:144)                                                                                                      io.reactivex.internal.operators.observable.ObservableDoFinally $DoFinallyObserver.onComplete(ObservableDoFinally.java:94)                                                                                                      io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)                                                                                                      io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.Java: 84)                                                                                                      io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)                                                                                                      io.reactivex.Observable.subscribe(Observable.java:10700)                                                                                                      io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)                                                                                                      io.reactivex.Observable.subscribe(Observable.java:10700)                                                                                                      io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)                                                                                                      io.reactivex.Observable.subscribe(Observable.java:10700)                                                                                                      io.reactivex.internal.operators.observable.ObservableSubscribeOn $1.run(ObservableSubscribeOn.java:39)                                                                                                      io.reactivex.Scheduler $1.run(Scheduler.java:138)                                                                                                      io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)                                                                                                      io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)                                                                                                      java.util.concurrent.FutureTask.run(FutureTask.java:237)                                                                                                      java.util.concurrent.ScheduledThreadPoolExecutor $ScheduledFutureTask.access $201 (ScheduledThreadPoolExecutor.java:152)                                                                                                      java.util.concurrent.ScheduledThreadPoolExecutor $ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)                                                                                                      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)                                                                                                      java.util.concurrent.ThreadPoolExecutor $Worker.run(ThreadPoolExecutor.java:587)                                                                                                      java.lang.Thread.run(Thread.java:818)

+4
2

- , , /.

/ , , .

, , .

 @Override
public void initialize() {
    if (!isViewAttached()) {
        throw new ViewNotAttachedException();
    }
    disposable = listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
}

@Override
public void dispose() {
    if (disposable != null) {
        disposable.dispose();
    }
}
+1

, , - observeOn . observeOn , onNext, onError onCompleted, ( )

listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })

            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
+1

Source: https://habr.com/ru/post/1675919/


All Articles