RxJava: monitored and default thread

I have the following code:

Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { Thread thread = new Thread(new Runnable() { @Override public void run() { s.onNext("1"); s.onComplete(); } }); thread.setName("background-thread-1"); thread.start(); } }).map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { String threadName = Thread.currentThread().getName(); logger.logDebug("map: thread=" + threadName); return "map-" + s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(String s) { String threadName = Thread.currentThread().getName(); logger.logDebug("onNext: thread=" + threadName + ", value=" + s); } @Override public void onError(Throwable e) {} @Override public void onComplete() { String threadName = Thread.currentThread().getName(); logger.logDebug("onComplete: thread=" + threadName); } }); 

And here is the conclusion:

 map: thread=background-thread-1 onNext: thread=background-thread-1, value=map-1 onComplete: thread=background-thread-1 

Important detail: I am calling the subscribe method from another thread ( main thread in Android).

It seems that the Observable class is synchronous and defaults and does everything (such as map + subscriber notifications) in the same thread that emits events ( s.onNext ), right? Interesting ... is this intentional behavior, or I just didn’t understand something? Actually, I expected that at least onNext and onComplete would call the callbacks in the caller's thread, and not on one of them. Do I understand correctly that in this particular case the actual thread of the caller does not matter? At least when events are generated asynchronously.

Another problem: what if I get some Observable as a parameter from some external source (i.e. I do not generate it myself) ... for me there is no way to find out if it is synchronous or asynchronous, and I just I need to explicitly specify where I want to receive callbacks using the subscribeOn and observeOn , right?

Thanks!

+5
source share
1 answer

RxJava is not supported near concurrency. It will create the values ​​in the subscription theme unless you use some other mechanism like observOn / subscribeOn. Please do not use low-level constructions of type Thread in operators, you can break the contract.

Due to the use of Thread, onNext will be called from the calling thread ('background-thread-1'). Subscription occurs upon call (UI-Thread). Each statement in the chain will be called from "background-thread-1'-call-Thread". An onNext subscription will also be called from "background-thread-1".

If you want to create values ​​that are not part of the calling stream, use: subscribeOn. If you want to switch the thread back to the main one, use observOn somewhere in the chain. Most likely, before subscribing to it.

Example:

 Observable.just(1,2,3) // creation of observable happens on Computational-Threads .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite .map(integer -> integer) // map happens on Computational-Threads .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread .subscribe(integer -> { // called from mainThread }); 

Here is a good explanation. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

+4
source

Source: https://habr.com/ru/post/1266786/


All Articles