Here is a solution that you can use to convert Stream<String> , each element representing a string, to Stream<List<String>> , each element representing a fragment found using the specified delimiter:
public class ChunkSpliterator implements Spliterator<List<String>> { private final Spliterator<String> source; private final Predicate<String> start, end; private final Consumer<String> getChunk; private List<String> current; ChunkSpliterator(Spliterator<String> lineSpliterator, Predicate<String> chunkStart, Predicate<String> chunkEnd) { source=lineSpliterator; start=chunkStart; end=chunkEnd; getChunk=s -> { if(current!=null) current.add(s); else if(start.test(s)) current=new ArrayList<>(); }; } public boolean tryAdvance(Consumer<? super List<String>> action) { while(current==null || current.isEmpty() || !end.test(current.get(current.size()-1))) if(!source.tryAdvance(getChunk)) return false; current.remove(current.size()-1); action.accept(current); current=null; return true; } public Spliterator<List<String>> trySplit() { return null; } public long estimateSize() { return Long.MAX_VALUE; } public int characteristics() { return ORDERED|NONNULL; } public static Stream<List<String>> toChunks(Stream<String> lines, Predicate<String> chunkStart, Predicate<String> chunkEnd, boolean parallel) { return StreamSupport.stream( new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd), parallel); } }
Lines matching predicates are not included in the slice; it would be easy to change this behavior if necessary.
It can be used as follows:
ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)), Pattern.compile("^<start>$").asPredicate(), Pattern.compile("^<stop>$").asPredicate(), true ) .collect(new MyProcessOneBucketCollector<>())
Patterns are specified as ^word$ to require that the entire line consist of only a word; without these anchors, lines containing a pattern can begin and end with a piece. The nature of the source stream does not allow parallelism when creating chunks, so when chaining with an immediate data collection operation, parallelism is rather limited for the whole operation. It depends on MyProcessOneBucketCollector , if at all there can be any parallelism.
If your end result does not depend on the order of occurrences of the buckets in the source file, it is highly recommended that your collector read UNORDERED or enter unordered() in the thread method chains before collect .
source share