Run thread in main thread

Again I am comparing RxJava with Java 9 Flow. I see that the default thread is asynchronous, and I was wondering if there is a way to make it work synchronously.

Sometimes we just want to use it not for Nio, but for sugar syntax and have a more homogeneous code.

In RxJava, by default it is synchronous, and you can run it asynchronously using observerOn and subscribeOn in your pipeline.

Is there any statement in the thread so that it starts in the main thread ?.

Sincerely.

+5
source share
3 answers

You can define your own Publisher as described in Flow to use synchronous execution.

A very simple publisher that only releases (on request) a TRUE element for one subscriber. Since the subscriber receives only one element, this class does not use buffering and sequencing control.

 class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (n != 0 && !completed) { completed = true; if (n < 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } } 
+5
source

There is no operator for this, but the API allows you to control how articles are published. Therefore, you can simply call the subscriber methods directly from the current thread.

 class SynchronousPublisher implements Publisher<Data> { public synchronized void subscribe(Subscriber<? super Data> subscriber) { subscriber.onSubscribe(new SynchronousSubscription(subscriber)); } } static class SynchronousSubscription implements Subscription { private final Subscriber<? super Data> subscriber; SynchronousSubscription(Subscriber<? super Data> subscriber) { this.subscriber = subscriber; } public synchronized void request(long n) { ... // prepare item subscriber.onNext(someItem); } ... } } 
+3
source

It depends on what you mean by controlling the main thread.

If you want to force an arbitrary thread in a specific thread, there is no standard way for it if the thread is not implemented in the library, which allows you to override components that provide asynchrony. In terms of RxJava, it is a Scheduler provided by the Schedulers utility class.

If you want to watch a thread in the main thread, you need to write a lock queue consumer on top of Flow.Subscriber that locks the thread until the queue has elements. This can get complicated, so I will give you the implementation of blockingSubscribe in Reactive4JavaFlow .

If you want to use the main Java thread as Executor / Scheduler , this is even more complicated and requires a similar locking mechanism, as well as some ideas from the threadpool executor. Reactive4JavaFlow has a Scheduler that you can use as a Contractor through: new SubmissionPublisher<>(128, blockingScheduler::schedule) .

+2
source

Source: https://habr.com/ru/post/1272248/


All Articles