Rxjava: Subscribe to a specific topic

I am new to Rxjava. I have the following code:

System.out.println("1: " + Thread.currentThread().getId()); Observable.create(new rx.Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subcriber) { System.out.println("2: " + Thread.currentThread().getId()); // query database String result = .... subcriber.onNext(result); } }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> { System.out.println("3: " + Thread.currentThread().getId()); }); 

For example, the output would be:

1: 50
2: 100
3: 100

I want subscribers to start on a thread with id 50. How can I do this?

+5
source share
5 answers

I think there are two cases. Either you need it to work in the user interface thread, or because of synchronization. As I know, you cannot call a function in a specific thread, because when a method is called, it is bound to the context of the thread, so it is impossible to call a method from a thread to another thread. Your problem is that the method in the subscriber is called from Schedulers.newThread() . I also found this github question about Schedulers.currentThread() . You need to notify the caller thread when the caller is called.

You can also use akka, the easiest way is to write parallel code with it.

Sorry for my bad grammar.

+1
source

From docs :

By default, the Observable and the chain of operators that you access will do its job and will notify its observers on the same topic on which the Subscribe method is called. The SubscribeOn statement modifies this behavior by specifying a different scheduler on which the Observable should run . The ObserveOn statement indicates another scheduler that the Observable will use to send notifications to its observers.

So you can just use subscribe instead of subscribeOn to watch your collection in the same thread that was created, something like this:

 Observable.create(new rx.Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subcriber) { System.out.println("2: " + Thread.currentThread().getId()); // query database String result = .... subcriber.onNext(result); } }).subscribe(countResult -> { System.out.println("3: " + Thread.currentThread().getId()); }); 

UPDATE: If your application is an Android application, you can use the subscription on the background thread as you do, and transfer the results to the main thread using Handler messages.

If your application is a Java application, I can suggest using the wait () and notify () mechanism or consider using frameworks like EventBus or akka for more complex scenarios.

0
source

With RxJava 1.0.15, you can apply toBlocking() to subscribe , and everything will work in the thread that created the entire sequence of statements.

-1
source

So, subscribeOn indicates which Observable stream will start emitting elements, watchOn "switches" to the stream for the rest of the observed chain. Put checkOn (schedulers.CurrentThread ()) right in front of the subscription, and you will get into the thread that this observable creates, and not the thread in which it runs. Here is a resource that explains rxjava threads very well. http://www.grahamlea.com/2014/07/rxjava-threading-examples/

-1
source

I believe that @akarnokd is right. You either run without .subscribeOn(Schedulers.newThread()) so that this happens synchronously, or use toBlocking() just before the subscription. Alternatively, if you just want everything to happen on the same thread, but it should not be the current thread, you can do this:

 Observable .defer(() -> Observable.create(...)) .subscribeOn(Schedulers.newThread()) .subscribe(subscriber); 

defer ensures that a call to create occurs in the subscription stream.

-1
source

Source: https://habr.com/ru/post/1235342/


All Articles