Is there an elegant way to handle stream in pieces?

My exact scenario is that we insert data into the database in batches, so I want to accumulate DOM objects, and then every 1000, clear them.

I implemented it by putting the code in the drive to detect completeness, then reset, but it seems wrong - the flow control should come from the caller.

I could convert the stream to a list, and then use subList in iterative mode, but that also seems awkward.

Is there a neat way to take action for each n elements and then continue the stream, but only process the stream once?

+36
source share
6 answers

Elegance in the eye of the beholder. If you don't mind using the state function in groupingBy , you can do this:

 AtomicInteger counter = new AtomicInteger(); stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize)) .values() .forEach(database::flushChunk); 

This will not lead to a performance or memory gain over the original solution, because it will materialize the entire stream anyway before doing anything.

If you want to avoid materializing the list, the stream API will not help you. You will need to get a stream iterator or spliterator and do something like this:

 Spliterator<Integer> split = stream.spliterator(); int chunkSize = 1000; while(true) { List<Integer> chunk = new ArrayList<>(size); for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; if (chunk.isEmpty()) break; database.flushChunk(chunk); } 
+12
source

Using StreamEx library solution will look

 Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15); AtomicInteger counter = new AtomicInteger(0); int chunkSize = 4; StreamEx.of(stream) .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0) .forEach(chunk -> System.out.println(chunk)); 

Output:

 [0, 1, 2, 3] [4, 5, 6, 7] [8, 9, 10, 11] [12, 13, 14] 

groupRuns accepts a predicate that decides whether 2 elements should be in the same group.

He creates a group as soon as he finds the first element that does not belong to it.

+5
source

If you have a guava dependency for your project, you can do this:

 StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...); 

See https://google.imtqy.com/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

+5
source

You can create a chunk stream ( List<T> ) from a stream of elements and a given chunk size :

  • grouping elements by chunk index (element index / chunk size)
  • sorting pieces by their index
  • reducing the map to ordered elements only

Code:

 public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) { AtomicInteger index = new AtomicInteger(0); return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize)) .entrySet().stream() .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue); } 

Usage example:

 Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf); Stream<List<Integer>> chunked = chunked(stream, 8); chunked.forEach(chunk -> System.out.println("Chunk: " + chunk)); 

Output:

 Chunk: [0, 1, 2, 3, 4, 5, 6, 7] Chunk: [8, 9, 10, 11, 12, 13, 14, 15] Chunk: [16, 17, 18, 19, 20, 21, 22, 23] Chunk: [24, 25, 26, 27, 28, 29, 30, 31] Chunk: [32, 33, 34, 35, 36, 37, 38, 39] Chunk: [40, 41, 42, 43, 44, 45, 46, 47] Chunk: [48, 49, 50, 51, 52, 53, 54, 55] Chunk: [56, 57, 58, 59, 60, 61, 62, 63] Chunk: [64, 65, 66, 67, 68, 69, 70, 71] Chunk: [72, 73, 74, 75, 76, 77, 78, 79] Chunk: [80, 81, 82, 83, 84, 85, 86, 87] Chunk: [88, 89, 90, 91, 92, 93, 94, 95] Chunk: [96, 97, 98, 99] 
+4
source

As Misha correctly said, Elegance is in the eye of the beholder. I personally think that an elegant solution would be to let the class that inserts into the database complete this task. It looks like a BufferedWriter . Thus, it does not depend on your original data structure and can be used even with several streams one by one. I'm not sure if this is exactly what you mean by having a code in the battery that you think is wrong. I don't think this is wrong, since existing classes such as BufferedWriter work this way. This way, you have some control of the caller's reset, calling flush() on the writer at any time.

Something like the following code.

 class BufferedDatabaseWriter implements Flushable { List<DomObject> buffer = new LinkedList<DomObject>(); public void write(DomObject o) { buffer.add(o); if(buffer.length > 1000) flush(); } public void flush() { //write buffer to database and clear it } } 

Now your thread is processed like this:

 BufferedDatabaseWriter writer = new BufferedDatabaseWriter(); stream.forEach(o -> writer.write(o)); //if you have more streams stream2.forEach(o -> writer.write(o)); writer.flush(); 

If you want to work in multi-threaded mode, you can run asynchronous reset. Taking from a stream cannot go in parallel, but I don’t think there is a way to count 1000 elements from a stream in parallel.

You can also extend AutoCloseable to allow setting the buffer size in the constructor, or make it implement AutoCloseable and run it by trying resources and much more. Good things you got from BufferedWriter .

+1
source

No, it seems, because creating chunks means reducing the flow, and decreasing means completing. If you need to maintain the nature of threads and chunks of processes without collecting all the data before here is my code (does not work for parallel threads):

 private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) { return (data, element) -> { if (data.size() < chunkSize) { data.addAll(element); return data; } else { consumer.accept(data); return element; // in fact it new data list } }; } private static <T> Function<T, List<T>> createList(int chunkSize) { AtomicInteger limiter = new AtomicInteger(0); return element -> { limiter.incrementAndGet(); if (limiter.get() == 1) { ArrayList<T> list = new ArrayList<>(chunkSize); list.add(element); return list; } else if (limiter.get() == chunkSize) { limiter.set(0); } return Collections.singletonList(element); }; } 

and how to use

 Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println); int chunkSize = 3; Stream.generate(StrTokenizer::getInt).limit(13) .map(createList(chunkSize)) .reduce(processChunks(chunkProcessor, chunkSize)) .ifPresent(chunkProcessor); static Integer i = 0; static Integer getInt() { System.out.println("next"); return i++; } 

he will print

next next next next 0 1 2 next next next 3 4 5 next next next 6 7 8 next next next 9 10 11 12

The idea is to create lists in a map operation with a "template"

[1,], [2], [3], [4,] ...

and combine (+ process), which is with a decrease.

[1,2,3], [4,5,6], ...

and do not forget to process the last "cut" piece with

 .ifPresent(chunkProcessor); 
0
source

Source: https://habr.com/ru/post/1276164/


All Articles