Let's say I have an API that, based on some query criteria, will find or build a widget:
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
The code (synchronous) client looks like this:
try { Widget w = getMatchingWidget(criteria); processWidget(w); } catch (Throwable t) { handleError(t); }
Now let's say that finding or building a widget is unpredictably expensive, and I donβt want clients to lock while waiting for it. Therefore, I change it to:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
Customers can then write:
CompletableFuture<Widget> f = getMatchingWidget(criteria); f.thenAccept(this::processWidget) f.exceptionally(t -> { handleError(t); return null; })
or
getMatchingWidget(criteria).whenComplete((t, w) -> { if (t != null) { handleError(t); } else { processWidget(t); } });
Now, say, instead, the synchronous API can return 0 to n widgets:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
Naively, I could write:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
However, this does not actually make the code non-blocking, it simply locks the lock - either Future locks until all Widgets are available, or code that iterates over Stream blocks waiting for each Widget . What I want is what will allow me to process each widget as they arrive, for example:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
But this does not provide error handling, and even if I add an additional Consumer<Throwable> errorHandler , it does not allow me, for example, to compose my search as a widget with other queries or to convert the results.
So, I'm looking for some kind of composite thing that combines the characteristics of Stream (iterability, transformability) with the characteristics of CompletableFuture (asynchronous result and error handling). (And while we're on it, back pressure can be enjoyable.)
Is this java.util.concurrent.Flow.Publisher ? io.reactivex.Observable ? Something more complicated? Is something easier?