Java 8 lambda: iterate over stream objects and use previous / next object (s) in stream

I am doing some entry level jambda java 8 features.

Given a list of messages, each of which contains a message offset, where all offsets should form a sequential list of integers, I am trying to find spaces for warning. I feel that all of this should be well done with a nice lambda. But I can not get around it.

So here is this working snippet:

private void warnAboutMessageGaps(final List<Message> messages) { final List<Long> offsets = messages.stream() .sorted(comparingLong(Message::getOffset)) .map(Message::getOffset) .collect(toList()) ; for (int i = 0; i < offsets.size() - 1; i++) { final long currentOffset = offsets.get(i); final long expectedNextOffset = offsets.get(i) + 1; final long actualNextOffset = offsets.get(i + 1); if (currentOffset != expectedNextOffset) { LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1); } } } 

I cannot figure out how to make it so that I can "compare with previous / next object" in lambda. Any pointers would be appreciated.

/ edit: Suggestions for StreamEx and other third-party solutions, although evaluated, are not what I was looking for.

+5
source share
5 answers

Sometimes trying to do everything with lambda expressions complicates decisions. You can use:

 messages.stream() .mapToLong(Message::getOffset) .sorted() .forEachOrdered(new LongConsumer() { boolean first=true; long expected; public void accept(long value) { if(first) first=false; else if(value!=expected) LOG.error("Missing offset(s) found in messages: missing from {} to {}", expected, value); expected=value+1; } }); 

but note that no matter how loose the thread chain can be, sorted() is an intermediate intermediate operation that creates and uses a backup array backstage. You do not lose anything if you explicitly use this array:

 long[] l = messages.stream().mapToLong(Message::getOffset).toArray(); Arrays.sort(l); for(int ix=1; ix<l.length; ix++) { long value = l[ix], expected = l[ix-1]+1; if(value!=expected) LOG.error("Missing offset(s) found in messages: missing from {} to {}", expected, value); } 

It is difficult to find a simpler solution. But if you want to reduce the amount of memory needed, you can use BitSet instead of an array:

 OptionalLong optMin = messages.stream().mapToLong(Message::getOffset).min(); if(!optMin.isPresent()) return; long min = optMin.getAsLong(); BitSet bset = messages.stream() .mapToLong(Message::getOffset) .collect(BitSet::new, (bs,l) -> bs.set((int)(l-min)), BitSet::or); for(int set=0, clear; set>=0; ) { clear = bset.nextClearBit(set); set = bset.nextSetBit(clear); if(set >= 0) LOG.error("Missing offset(s) found in messages: missing from {} to {}", min+clear, min+set); } 

This will significantly reduce the memory used when there are no gaps or reasonably small gaps compared to the range of offset values. It fails when the distance between the smallest offset and the largest offset is greater than Integer.MAX_VALUE .

You can check this in advance, which also opens up the possibility of a short circuit if there are no spaces:

 LongSummaryStatistics stat = messages.stream() .mapToLong(Message::getOffset).summaryStatistics(); if(stat.getCount()==0 || // all solutions assume that there are no duplicates, in this case, // the following test allows to prove that there are no gaps: stat.getMax()-stat.getMin()==messages.size()-1) { return; } if(stat.getMax()-stat.getMin()>Integer.MAX_VALUE) { // proceed with array based test โ€ฆ } else { long min = stat.getMin(); // proceed with BitSet based test โ€ฆ 
+2
source

You can do this with StreamEx using the pairMap method:

 StreamEx.of(messages) .sorted(Comparator.comparingLong(Message::getOffset)) .pairMap((prev, next) -> new Message[] {prev, next}) .forEach(prevNext -> { long currentOffset = prevNext[0].getOffset(); long expectedNextOffset = prevNext[0].getOffset() + 1; long actualNextOffset = prevNext[1].getOffset(); if (currentOffset != expectedNextOffset) { LOG.error( "Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1); } }); 
+3
source

What about:

  List<Long> offsets = messages.stream() .sorted(comparingLong(Message::getOffset)) .map(Message::getOffset) .collect(toList()); IntStream.range(1, offsets.size()) .mapToObj(i -> new Pair<>(offsets.get(i - 1), offsets.get(i))) .forEach(pair -> { final long currentOffset = pair.getKey(); final long expectedNextOffset = pair.getKey() + 1; final long actualNextOffset = pair.getValue(); if (actualNextOffset != expectedNextOffset) { LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1); } }); 
+2
source

To learn the Java 8 api, you can use Collector, where you essentially compare each member of the stream in turn, and you use the BadPairs battery class to track any spaces in the offset sequence.

I wrote this to be more detailed than it should be, to help you understand the relationship between the supplier, battery, and lambdas combiner.

 public class PairedStreamTest { private BiConsumer<BadPairs,BadPairs> combiner = (bad1,bad2) -> bad1.add(bad2); private Supplier<BadPairs> supplier = BadPairs::new; private BiConsumer<BadPairs,Message> accumulator = (bad,msg) -> bad.add(msg); @Test public void returnsTwoBadPairs_givenInputStreamIsMissingOffsets_forFourAndSix() throws Exception { BadPairs badPairs = Stream.of(new Message(1), new Message(2), new Message(3), new Message(5), new Message(7)) .sorted(comparingLong(Message::getOffset)) .collect(supplier, accumulator, combiner); badPairs.pairs.forEach(pair -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", pair.first.offset, pair.second.offset)); assertTrue(badPairs.pairs.size() == 2); } // supporting classes for the above test code private final Logger LOG = LoggerFactory.getLogger(PairedStreamTest.class); class Message { public int offset; public Message(int i) { this.offset = i; } public Integer getOffset() { return this.offset; } } class Pair { private Message first; private Message second; public Pair(Message smaller, Message larger) { this.first = smaller; this.second = larger; } } class BadPairs { public Message previous; public Set<Pair> pairs = new HashSet<>(); public void add(BadPairs other) { this.pairs.addAll(other.pairs); } public void add(Message msg) { if(previous != null && previous.offset != msg.offset-1) { this.pairs.add(new Pair(previous, msg)); } this.previous = msg; } } } 

Sorry for the misuse of public member variables and the location of this test class. My intention is to focus the reader's attention on the @Test case @Test , and not on class support.

+1
source

What about:

 final List<Long> offsets = messages.stream().map(Message::getOffset).sorted().collect(toList()); IntStream.range(0, offsets.size() - 1).forEach(i -> { long currentOffset = offsets.get(i); if (offsets.get(i + 1) != currentOffset + 1) { LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, offsets.get(i + 1) - 1); } }); 

Or all in one application from StreamEx :

 StreamEx.of(messages).mapToLong(Message::getOffset).sorted().boxed() .pairMap((i, j) -> new long[] { i, j }).filter(a -> a[1] - a[0] > 1) .forEach(a -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", a[0] + 1, a[1] - 1)); 

Or all in one AbacusUtil statement:

 Stream.of(messages).mapToLong(Message::getOffset).sorted() .sliding0(2).filter(e -> e.size() == 2 && e.get(1) - e.get(0) > 1) .forEach(e -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", e.get(0) + 1, e.get(1) - 1)); 
+1
source

Source: https://habr.com/ru/post/1265487/


All Articles