Asynchronous, compound return value for vector / stream data in Java 9

Let's say I have an API that, based on some query criteria, will find or build a widget:

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

The code (synchronous) client looks like this:

 try { Widget w = getMatchingWidget(criteria); processWidget(w); } catch (Throwable t) { handleError(t); } 

Now let's say that finding or building a widget is unpredictably expensive, and I don’t want clients to lock while waiting for it. Therefore, I change it to:

 CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

Customers can then write:

 CompletableFuture<Widget> f = getMatchingWidget(criteria); f.thenAccept(this::processWidget) f.exceptionally(t -> { handleError(t); return null; }) 

or

 getMatchingWidget(criteria).whenComplete((t, w) -> { if (t != null) { handleError(t); } else { processWidget(t); } }); 

Now, say, instead, the synchronous API can return 0 to n widgets:

 Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

Naively, I could write:

 CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

However, this does not actually make the code non-blocking, it simply locks the lock - either Future locks until all Widgets are available, or code that iterates over Stream blocks waiting for each Widget . What I want is what will allow me to process each widget as they arrive, for example:

 void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

But this does not provide error handling, and even if I add an additional Consumer<Throwable> errorHandler , it does not allow me, for example, to compose my search as a widget with other queries or to convert the results.

So, I'm looking for some kind of composite thing that combines the characteristics of Stream (iterability, transformability) with the characteristics of CompletableFuture (asynchronous result and error handling). (And while we're on it, back pressure can be enjoyable.)

Is this java.util.concurrent.Flow.Publisher ? io.reactivex.Observable ? Something more complicated? Is something easier?

+5
source share
1 answer

Your use case very naturally falls into the world that RxJava accesses. If we observe:

 Observable<Widget> getMatchingWidgets(wc); 

which generates zero or more widgets based on criteria, you can process each widget as it appears using:

 getMatchingWidgets(wc) .subscribeOn( backgroundScheduler ) .subscribe( w -> processWidget(w), error -> handleError(error) ); 

The observed chain will run on backgroundScheduler , which is often a wrapper for the thread pool worker service. If you need to perform final processing of each widget in the user interface, you can use the observeOn() operator to go to the user interface scheduler before processing:

 getMatchingWidgets(wc) .subscribeOn( backgroundScheduler ) .observeOn( uiScheduler ) .subscribe( w -> processWidget(w), error -> handleError(error) ); 

For me, the elegance of the RxJava approach is that it freely controls many of the pipe control nuts and bolts. Looking at this chain of observers, you know exactly what is happening and where.

+6
source

Source: https://habr.com/ru/post/1272757/


All Articles