In Java 8 thread, error handling thread for later use

Say I have the following method that I want to reorganize

protected Stream<T> parseFile(File file, Consumer<File> cleanup) { try { return parser.parse(file); // returns a Stream<T> } catch (XmlParseException e) { // child of RuntimeException throw new CustomRuntimeException(e); } finally { if (file != null) { cleanup.accept(file); } } throw new IllegalStateException("Should not happen"); } 

This goal of the method is to perform error handling when connecting a proxy server when reversing a stream in a CustomRuntimeException exception-wrapping. Therefore, when we consume it later in the stream, I do not need to handle these exceptions everywhere, but only a CustomRuntimeException .

Upstream, I used this method following

 try { Stream<T> stream = parseFile(someFile); stream.map(t -> ...); catch (CustomRuntimeException e) { // do some stuff } 

And this is what the parser.parse method looks like

 public Stream<T> parse() { // ValueIterator<T> implements Iterator<T>, AutoCloseable XmlRootParser.ValueIterator<T> valueIterator = new XmlRootParser.ValueIterator(this.nodeConverter, this.reader, this.nodeLocalName, this.nodeName); Stream<T> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(valueIterator, 1040), false); stream.onClose(valueIterator::close); return stream; } 

The exceptions I want to handle will be thrown using the ValueIterator.hasNext method. This means that they will not be thrown away when creating a Stream, but only during stream consumption (by calling foreach / map / count / collect / ... in the stream).

How can I easily attach error handling to my stream in the parseFile method without consuming the stream? Is it possible?

Obviously, this code will only work if the parser.parse method consumes its stream before returning it. This is contrary to the use of threads.

+5
source share
2 answers

The Stream s backend, which provides iterator logic, is Spliterator .

So, you can wrap the element processing using the Spliterator wrapper as follows:

 class Wrapper<T> implements Spliterator<T> { final Spliterator<T> source; public Wrapper(Spliterator<T> source) { this.source = source; } @Override public boolean tryAdvance(Consumer<? super T> action) { try { return source.tryAdvance(action); } catch(XmlParseException ex) { throw new CustomRuntimeException(ex); } } @Override public void forEachRemaining(Consumer<? super T> action) { try { source.forEachRemaining(action); } catch(XmlParseException ex) { throw new CustomRuntimeException(ex); } } @Override public Spliterator<T> trySplit() { Spliterator<T> srcPrefix = source.trySplit(); return srcPrefix == null? null: new Wrapper<>(srcPrefix); } @Override public long estimateSize() { return source.estimateSize(); } @Override public int characteristics() { return source.characteristics(); } @Override public Comparator<? super T> getComparator(){return source.getComparator();} } 

It saves all the properties of the original Spliterator and translates only the exceptions that occur during iteration.

Then you can use it as

 protected Stream<T> parseFile(File file) { Stream<T> s = parser.parse(); return StreamSupport.stream(new Wrapper<>(s.spliterator()), s.isParallel()) .onClose(s::close); } 

And the caller must not forget to close the stream correctly:

  ResultType result; try(Stream<T> s = parseFile(file)) { result = s. // other intermediate ops // terminal operation } 

or

  ResultType result; try(Stream<T> s = parseFile(file)) { result = s. // other intermediate ops // terminal operation } finally { // other cleanup actions } 
+6
source

You can use the auxiliary thread initialization class, which processes the process of preparing the thread and catches any exception. Consider the following example:

 import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; public class SafeInitializationStreamExample { public static void main(String[] args) { int sum = SafeInitializationStream.from(() -> Stream.of(1,2,3,4)) .onInitializationError(t -> System.out.println(t.getMessage())) .mapToInt(it -> it) .sum(); System.out.println(sum); List<Object> list = SafeInitializationStream.from(() -> parse("/tmp/test.log")) .onInitializationError(t -> System.out.println(t.getMessage())) .map(it -> it.toString()) .collect(Collectors.toList()); System.out.println(list); } private static <T> Stream<T> parse(String filename) { throw new RuntimeException("File does not exist!"); } static class SafeInitializationStream<T> { private final Supplier<Stream<T>> streamSupplier; private SafeInitializationStream(Supplier<Stream<T>> streamSupplier) { this.streamSupplier = streamSupplier; } public static <T> SafeInitializationStream<T> from(Supplier<Stream<T>> streamSupplier) { return new SafeInitializationStream<>(streamSupplier); } public Stream<T> onInitializationError(Consumer<Throwable> onError) { try { return streamSupplier.get(); } catch (Exception e) { onError.accept(e); } return Stream.empty(); } } } 

This example introduces the SafeInitializationStream class, which expects a Supplier<Stream<T>> :

 SafeInitializationStream.from(() -> Stream.of(1,2,3,4)) 

Using Supplier<Stream<T>> in this case makes initializing the stream lazy - until we call Supplier.get() , the body of this provider will not be executed. Now when we call:

 .onInitializationError(t -> System.out.println(t.getMessage())) 

we run a team of providers, catch any exception that can be thrown, and handle this exception by passing Throwable to Consumer<Throwable> , which was passed as a parameter to the onInitializationError method. In the case of an exception, Stream.empty() returned (so that you can safely apply all other transformations in the chain. If there is no exception, the Stream<T> provided by the provider is returned.

If you run the following example, you will get a console:

 10 File does not exist! [] 

The first thread was used up without errors and sum was returned correctly.

During initialization, the second thread threw an exception, we caught it and printed it to the console, and finally, the empty list was returned after the stream was consumed.

Of course, you can use Function<Throwable, Stream<T>> in the onInitializationError method if you want to indicate that Stream<T> returned if an exception onInitializationError . In this case, we assume that Stream.empty() always returned in this case. Hope this helps.

+3
source

Source: https://habr.com/ru/post/1275229/


All Articles