StreamEx grouping in lists returns the wrong number of records

The following code breaks the stream of objects into pieces 1000, processes them during materialization, and returns the total number of objects at the end.

In all cases, the return number is true if the stream size is not 1. In the case where the stream size is 1, the return number is 0.

Any help would be greatly appreciated. I also had to crack the callback if there were no entries in the stream equal to 0. I would also like to fix this.

AtomicInteger recordCounter = new AtomicInteger(0); try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) { stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0) .forEach((chunk) -> { //... process each chunk } ); } catch(Exception e) { throw new MyRuntimeException("Failure streaming...", e); } finally { myObjects.close(); } return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet(); 
+1
source share
4 answers

In the end, I went with Guava Iterators.partition () to split my stream of objects into pieces:

 MutableInt recordCounter = new MutableInt(); try { Iterators.partition(myObjects.iterator(), 1000) .forEachRemaining((chunk) -> { //process each chunk ... recordCounter.add(chunk.size()); }); } catch (Exception e) { throw new MyRuntimeException("Failure streaming...", e); } finally { myObjects.close(); } return recordCounter.getValue(); 
0
source

As JavaDoc says:

sameGroup - a stateless non-interfering predicate for application to a pair of adjacent elements, which returns true for elements belonging to the same group.

The predicate must be stateless, and this is not your case. You are using the method incorrectly, so you cannot get the expected result. It works very close to what you want by accident, you cannot rely on this behavior, it may change in future versions of StreamEx.

+1
source

The counter was originally used to know when to split the pieces, and it is not reliable to count the total number of objects. When a stream has a size of 0 or 1 groupRuns , the function is not executed.

So you need a different way of counting objects. Instead of just consuming elements in forEach , you can return the number of processed chunk.size() and sum objects at the end

  AtomicInteger counter = new AtomicInteger(0); try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) { return stream .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0) .mapToLong((chunk) -> { //... process each chunk return chunk.size(); }) .sum(); } catch(Exception e) { throw new MyRuntimeException("Failure streaming...", e); } finally { myObjects.close(); } 
0
source

@ Nazariy Bardyuk explained why this does not work. I meet similar requirements to split the stream sooner. So I forked it and made a few changes: StreamEx-0.8.7 . Here is a simple example:

 int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> { System.out.println(chunk); return chunk.size(); }).sum(); System.out.println(count); 

If you are at the beginning of your project, you can try, and the code will be:

 try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) { return stream.splitToList(1000) .mapToInt((chunk) -> { //... process each chunk return chunk.size(); }).sum(); } 
0
source

Source: https://habr.com/ru/post/1276184/


All Articles