How to implement parallel takeWhile support for Stream API in Java 8?

I saw some implementations takeWhilefor the Java 8 thread API, but they all seem to turn the thread into a non-parallel thread. For example this :

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

Here it StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);turns the stream passed in takeWhileto a serial stream. Does anyone know of an implementation that supports parallel threads or how can I change this code to support / support parallel threads?

+4
source share
1 answer

If your source is known to be disordered, then the following work should work:

static final class UnorderedTakeWhileSpliterator<T> implements Spliterator<T>, Consumer<T>, Cloneable {
    private final Predicate<? super T> predicate;
    private final AtomicBoolean checked = new AtomicBoolean();
    private Spliterator<T> source;
    private T cur;

    UnorderedTakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) {
        this.predicate = predicate;
        this.source = source;
    }

    @Override
    public void accept(T t) {
        this.cur = t;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (!checked.get() && source.tryAdvance(this)) {
            if (predicate.test(cur)) {
                action.accept(cur);
                return true;
            } else {
                checked.set(true);
            }
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        Spliterator<T> prefix = source.trySplit();
        if(prefix == null) {
            return null;
        }
        if(checked.get()) {
            return Spliterators.emptySpliterator();
        }
        UnorderedTakeWhileSpliterator<T> clone;
        try {
            clone = (UnorderedTakeWhileSpliterator<T>) clone();
        } catch (CloneNotSupportedException e) {
            throw new InternalError(e);
        }
        clone.source = prefix;
        return clone;
    }

    @Override
    public long estimateSize() {
        return source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source.characteristics() & (DISTINCT | SORTED | NONNULL);
    }

    @Override
    public Comparator<? super T> getComparator() {
        return source.getComparator();
    }
}

Create a stream in the following way:

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(UnorderedTakeWhileSpliterator<>(stream.spliterator(), predicate), stream.isParallel());
}

, . - JDK-9 ( spliterator, ), , .

+2

Source: https://habr.com/ru/post/1666783/


All Articles