Efficient RxJava Flow Control

In my project, I need to process objects in different threads. To control threading behavior, I create new observables to change their observeOn() as follows:

 apiService.getObjects(token) // Retrofit .compose(bindToLifecycle()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(o -> { // process in Main Thread }) .map(Observable::just) // create new one, to change thread for it .observeOn(Schedulers.io()) .subscribe(o -> { // process in the background thread }); 

But I think that in RxJava there is a much more beautiful and efficient way to handle one response in different threads. I tried to do this but found nothing.

Thanks,
Anton

+5
source share
1 answer

In Rx, it is usually recommended to avoid side effects in the "do" blocks (which will only be executed if the stream is signed) and prefer to subscribe to the code.

In your case, you can use cache() or publish()...connect() , for example:

 query = apiService.getObjects(token) .compose(bindToLifecycle()) .subscribeOn(Schedulers.io()) .cache(); query.observeOn(AndroidSchedulers.mainThread()) .subscribe(o -> { // process in Main Thread }) query.observeOn(Schedulers.io()) .subscribe(o -> { // process in the background thread }); 

With publish() instead of cache() code is identical, but you can decide when to run the request by connecting the stream (you call query.connect() after connecting 2 subscriptions).

If your subscription work is background computing, Schedulers.computation() might be preferable to Schedulers.io() .

Please note that AFAICT your code will work fine, without the map(Observable::just) line map(Observable::just) , since the "observOn" statements only affect the down stream (and not the previous "do")

+3
source

Source: https://habr.com/ru/post/1232465/


All Articles