Here is one weird trick you can use to improve exception handling.
Let's say your mapping function looks like this:
String doMap(Object obj) { if (isInvalid(obj)) { throw new IllegalArgumentException("info about obj"); } else { return obj.toString(); } }
Returns the result if the object is valid, but it throws an exception if the object is invalid. Unfortunately, if you insert this directly into the pipeline, any error will stop the execution of the pipeline. What you want is like a type of โany,โ which can contain either a value or an error indicator (which would be an exception in Java).
It turns out that CompletableFuture can contain either a value or an exception. Although it is intended for asynchronous processing, which is not found here, we only need to slightly adjust it for use for our purposes.
First, when processing stream objects for processing, we call the matching function enclosed in a call to supplyAsync :
CompletableFuture<String>[] cfArray = stream.map(obj -> CompletableFuture.supplyAsync(() -> doMap(obj), Runnable::run)) .toArray(n -> (CompletableFuture<String>[])new CompletableFuture<?>[n]);
(Unfortunately, creating a shared array gives an unchecked warning, which should be suppressed.)
Odd construction
CompletableFuture.supplyAsync(supplier, Runnable::run)
starts the provider "asynchronously" on the provided Executor Runnable::run , which simply starts the task immediately in this thread. In other words, it starts the provider synchronously.
The trick is that the CompletableFuture instance returned from this call contains either a value from the provider if it returns normally, or it contains an exception if the provider throws it. (I do not pay attention to cancellation here.) Then we collect instances of CompletableFuture in an array. Why an array? He sets up for the following part:
CompletableFuture.allOf(cfArray).join();
This usually awaits the completion of the CF array. Since they run synchronously, they should already be completed. For this, it is important that join() throws a CompletionException if any of the CFs in the array terminate exclusively. If the connection completes normally, we can simply collect the return values. If the connection throws an exception, we can either throw it, or we can catch it and handle the exceptions stored in CF in the array. For instance,
try { CompletableFuture.allOf(cfArray).join(); // no errors return Arrays.stream(cfArray) .map(CompletableFuture::join) .collect(toList()); } catch (CompletionException ce) { long errcount = Arrays.stream(cfArray) .filter(CompletableFuture::isCompletedExceptionally) .count(); System.out.println("errcount = " + errcount); return Collections.emptyList(); }
If everything is successful, this returns a list of values. If there are any exceptions, this counts the number of exceptions and returns an empty list. Of course, you can easily do something else, for example, log data about exceptions, filter out exceptions and return a list of valid values, etc.