How to run subsciber for multiple threads using RxJava on Android

I am new to RxJava and fighting for (I think) a simple problem. I want to handle part of a simuleasly subscription in 3 threads. That is why I use FixedThreadPool. Code example:

Observer.just("one", "two", "three", "four")
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))
.subscribe(new Observer<String>() {

    public void onNext(String string) {
        Log.d(TAG, "Started: " + string);
        SystemClock.sleep(1000);
        Log.d(TAG, "Ended: " + string);
    }

    (...)

}

Expected Result:

Started: one
Started: two
Started: three
Ended: one
Started: four
Ended: two
Ended: three
Ended: four

Actual result:

Started: one
Ended: one
Started: two
Ended: two
Started: three
Ended: three
Started: four
Ended: four

What am I doing wrong?

+4
source share
1 answer

RxJava Observables are sequential, and operators subscribeOnand observeOnwill not run values ​​in parallel to each other.

The closest thing you can achieve is to group values ​​with a modular key, run them through observeOnand combine the results:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 100)
.groupBy(v -> count.getAndIncrement() % 3)
.flatMap(g -> g
    .observeOn(Schedulers.computation())
    .map(v ->  Thread.currentThread() + ": " + v))
.toBlocking()
.forEach(System.out::println);
+5

Source: https://habr.com/ru/post/1612641/


All Articles