Why is stream api not designed to handle exceptions?

Fixtures

BiConsumer<Exception, Consumer<? super Integer>> NOTHING = (ex, unused) ->{/**/};

When I try to fix the error reported by @Holger in this:

Stream<Integer> stream = Stream.of(1, 2, 3);

// v--- the bug I have already fixed, it will throws RuntimeException 
exceptionally(stream, NOTHING).collect(ArrayList::new, (l, x) -> {
    l.add(x);
    if (x < 4) throw new RuntimeException();
}, List::addAll);

Everything is in order, but when used, the Stream.of(T)operation map(...)will be called endlessly, for example:

List<Integer> result = exceptionally(
        //               v--- infinitely call
        Stream.of("bad").map(Integer::parseInt),
        NOTHING
).collect(toList());

But when I replace Stream.of(T)with Stream.of(T[]), it works fine again, for example:

//            v--- return an empty list
List<Integer> result = exceptionally(
        Stream.of(new String[]{"bad"}).map(Integer::parseInt),
        NOTHING
).collect(toList());

java.util.stream.Streams.StreamBuilderImpl#tryAdvanceshould reset first count, for example:

public boolean tryAdvance(Consumer<? super T> action) {
    Objects.requireNonNull(action);

    if (count == -2) {
        action.accept(first);
        count = -1;// <--- it should be call before `action.accept(first)`;
        return true;
    }
    else {
        return false;
    }
}

The Q . This should be a mistake in jdk, as it should maintain consistent semantics between methods Stream.of. I'm right?

<T> Stream<T> exceptionally(Stream<T> source,
        BiConsumer<Exception, Consumer<? super T>> exceptionally) {

    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;

        public ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.source = source;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            Boolean state = attempt(action);
            if (state == null) return true;
            if (state) action.accept(value);
            return state;
        }

        private Boolean attempt(Consumer<? super T> action) {
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                exceptionally.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(
            new ExceptionallySpliterator(source.spliterator()), 
            source.isParallel()
    ).onClose(source::close);
}
+2
source share
1 answer

- , , :

, , , , , , , .

, Stream.of(singleElement) , action.accept(first); count = -1;, , , , , .

, , BufferedReader.lines(), . Files.lines() , IOException.

, , , , , . , , , . , , , , - , , , , , .

+4

Source: https://habr.com/ru/post/1679567/


All Articles