You can use the auxiliary thread initialization class, which processes the process of preparing the thread and catches any exception. Consider the following example:
import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; public class SafeInitializationStreamExample { public static void main(String[] args) { int sum = SafeInitializationStream.from(() -> Stream.of(1,2,3,4)) .onInitializationError(t -> System.out.println(t.getMessage())) .mapToInt(it -> it) .sum(); System.out.println(sum); List<Object> list = SafeInitializationStream.from(() -> parse("/tmp/test.log")) .onInitializationError(t -> System.out.println(t.getMessage())) .map(it -> it.toString()) .collect(Collectors.toList()); System.out.println(list); } private static <T> Stream<T> parse(String filename) { throw new RuntimeException("File does not exist!"); } static class SafeInitializationStream<T> { private final Supplier<Stream<T>> streamSupplier; private SafeInitializationStream(Supplier<Stream<T>> streamSupplier) { this.streamSupplier = streamSupplier; } public static <T> SafeInitializationStream<T> from(Supplier<Stream<T>> streamSupplier) { return new SafeInitializationStream<>(streamSupplier); } public Stream<T> onInitializationError(Consumer<Throwable> onError) { try { return streamSupplier.get(); } catch (Exception e) { onError.accept(e); } return Stream.empty(); } } }
This example introduces the SafeInitializationStream class, which expects a Supplier<Stream<T>> :
SafeInitializationStream.from(() -> Stream.of(1,2,3,4))
Using Supplier<Stream<T>> in this case makes initializing the stream lazy - until we call Supplier.get() , the body of this provider will not be executed. Now when we call:
.onInitializationError(t -> System.out.println(t.getMessage()))
we run a team of providers, catch any exception that can be thrown, and handle this exception by passing Throwable to Consumer<Throwable> , which was passed as a parameter to the onInitializationError method. In the case of an exception, Stream.empty() returned (so that you can safely apply all other transformations in the chain. If there is no exception, the Stream<T> provided by the provider is returned.
If you run the following example, you will get a console:
10 File does not exist! []
The first thread was used up without errors and sum was returned correctly.
During initialization, the second thread threw an exception, we caught it and printed it to the console, and finally, the empty list was returned after the stream was consumed.
Of course, you can use Function<Throwable, Stream<T>> in the onInitializationError method if you want to indicate that Stream<T> returned if an exception onInitializationError . In this case, we assume that Stream.empty() always returned in this case. Hope this helps.