How to execute map, filter, flatMap using multiple threads in RxScala / Java?

How to run filter, mapand flatMapon Observableusing multiple threads:

  def withDelay[T](delay: Duration)(t: => T) = {
    Thread.sleep(delay.toMillis)
    t
  }

  Observable
    .interval(500 millisecond)
    .filter(x => {
      withDelay(1 second) { x % 2 == 0 }
    })
    .map(x => {
      withDelay(1 second) { x * x }
    }).subscribe(println(_))

The goal is to simultaneously perform filtering and conversion operations using multiple threads.

+4
source share
2 answers

Yo can use Async.toAsync () for each operation.

It is on the rxjava-async package

Documentation

0
source

observOn, , ,

       /**
 * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
 * Shall print
 * First step main
 * Second step RxNewThreadScheduler-2
 * Third step RxNewThreadScheduler-1
 */
@Test
public void testObservableObserverOn() throws InterruptedException {
    Subscription subscription = Observable.just(1)
            .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println( "Third step " + Thread.currentThread()
                    .getName()))
            .subscribe();
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

async https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

0

Source: https://habr.com/ru/post/1616150/


All Articles