I have the following code:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { Thread thread = new Thread(new Runnable() { @Override public void run() { s.onNext("1"); s.onComplete(); } }); thread.setName("background-thread-1"); thread.start(); } }).map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { String threadName = Thread.currentThread().getName(); logger.logDebug("map: thread=" + threadName); return "map-" + s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(String s) { String threadName = Thread.currentThread().getName(); logger.logDebug("onNext: thread=" + threadName + ", value=" + s); } @Override public void onError(Throwable e) {} @Override public void onComplete() { String threadName = Thread.currentThread().getName(); logger.logDebug("onComplete: thread=" + threadName); } });
And here is the conclusion:
map: thread=background-thread-1 onNext: thread=background-thread-1, value=map-1 onComplete: thread=background-thread-1
Important detail: I am calling the subscribe method from another thread ( main thread in Android).
It seems that the Observable class is synchronous and defaults and does everything (such as map + subscriber notifications) in the same thread that emits events ( s.onNext ), right? Interesting ... is this intentional behavior, or I just didnβt understand something? Actually, I expected that at least onNext and onComplete would call the callbacks in the caller's thread, and not on one of them. Do I understand correctly that in this particular case the actual thread of the caller does not matter? At least when events are generated asynchronously.
Another problem: what if I get some Observable as a parameter from some external source (i.e. I do not generate it myself) ... for me there is no way to find out if it is synchronous or asynchronous, and I just I need to explicitly specify where I want to receive callbacks using the subscribeOn and observeOn , right?
Thanks!
source share