False object sorting in Java 8 Stream API on a daily basis?

I have a large Java 8 Stream<MyObject> ( Stream<MyObject> ) with objects that look like this:

 class MyObject { private String string; private Date timestamp; // Getters and setter removed from brevity } 

I know that all timestamps for the first day will arrive before they are on day 2, but during each day the timestamps may be out of order. I would like to sort the MyObject order in timestamp every day using the Stream API. Since the stream is large, I have to do it as lazy as possible, i.e. It would be normal to keep the memory for one day MyObject in memory, but it would not be so much more.

How can i achieve this?

Update 2017-04-29 :

The requirement is that I want to continue working on the same thread after sorting! I would like something like this (pseudocode):

 Stream<MyObject> sortedStream = myStreamUnsorted().sort(onADailyBasis()); 
+3
source share
3 answers

I suggest the following solution:

Store each value of your stream in TreeMap to sort it immediately. Because the key uses the timestamp of the object.

  Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>(); 

We need to know which object to remove from the map at the end. It will be only one object, but a member must be (in fact) final in order to store it. So I chose a simple list.

  List<MyObject> lastObject = new ArrayList<>(); 

Set the current day as a whole.

  // just an example int currentDay = 23; 

Use a predicate that determines whether currentDay and the day of any passage through the object do not match.

  Predicate<MyObject> predicate = myObject -> myObject.getTimestamp() .toInstant() .atZone(ZoneId.systemDefault()) .toLocalDate() .getDayOfMonth() != currentDay; 

Flow stream. Use peek () twice. First put the object on the map. Secondly, to rewrite the object in the list. Use anyMatch () as a terminal operation and pass in the previously created predicate. As soon as the first object appears that meets the criteria coming from the next day, anyMatch () terminates the stream and returns true.

  stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject)) .peek(myObject -> lastObject.set(0, myObject)) .anyMatch(predicate); 

Now you only need to delete the last pass on the object, which already belongs to the next day and, therefore, not to your map.

  objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp()); 

Done. You have a sorted Map of objects that belong to just one day. Hope this meets your expectations. Below you can find all the code in one block so that it can be copied right away.

  Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>(); List<MyObject> lastObject = new ArrayList<>(); // just an example int currentDay = 23; Predicate<MyObject> predicate = myObject -> myObject.getTimestamp() .toInstant() .atZone(ZoneId.systemDefault()) .toLocalDate() .getDayOfMonth() != currentDay; stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject)) .peek(myObject -> lastObject.set(0, myObject)) .anyMatch(predicate); objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp()); 
+2
source

It depends on whether you want to process objects of all days or one specific day.

Based on DiabolicWords answer, this is an example of processing all days:

 TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp)); LocalDate[] currentDay = new LocalDate[1]; incoming.peek(o -> { LocalDate date = o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); if (!date.equals(currentDay[0])) { if (currentDay != null) { processOneDaysObjects(currentDaysObjects); currentDaysObjects.clear(); } currentDay[0] = date; } }).forEach(currentDaysObjects::add); 

This will collect objects in one day, process them, reset the collection and continue from the next day.

If you need only one specific day:

 TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp)); LocalDate specificDay = LocalDate.now(); incoming.filter(o -> !o.getTimestamp() .toInstant() .atZone(ZoneId.systemDefault()) .toLocalDate() .isBefore(specificDay)) .peek(o -> currentDaysObjects.add(o)) .anyMatch(o -> { if (o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().isAfter(specificDay)) { currentDaysObjects.remove(o); return true; } return false; }); 

The filter passes objects to specificDay , and anyMatch will terminate the stream after specificDay .

I read that there will be methods like skipWhile or takeWhile in streams with Java 9. This will simplify this.

Change for a given goal Op

Wow, this is a great exercise, and a pretty tough nut to crack. The problem is that the obvious solution (stream collection) always goes through the whole stream. You cannot take the following x-elements, arrange them, sink them, and then repeat without doing this for the entire stream (i.e., All Days) at once. For the same reason, the call to sorted() in the stream will go through it completely (especially since the stream does not know that the elements are sorted by day already). For reference, read this comment here: fooobar.com/questions/1276164 / ....

As they recommend, here's an Iterator implementation wrapped in a stream that looks forward in the original stream, takes the elements of one day, sorts them and gives you everything in a new new stream (without saving all the days in memory!). The implementation is more complicated because we do not have a fixed block size, but you always need to find the first element of the next day to know when to stop.

 public class DayByDayIterator implements Iterator<MyObject> { private Iterator<MyObject> incoming; private MyObject next; private Iterator<MyObject> currentDay; private MyObject firstOfNextDay; private Set<MyObject> nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp)); public static Stream<MyObject> streamOf(Stream<MyObject> incoming) { Iterable<MyObject> iterable = () -> new DayByDayIterator(incoming); return StreamSupport.stream(iterable.spliterator(), false); } private DayByDayIterator(Stream<MyObject> stream) { this.incoming = stream.iterator(); firstOfNextDay = incoming.next(); nextDaysObjects.add(firstOfNextDay); next(); } @Override public boolean hasNext() { return next != null; } @Override public MyObject next() { if (currentDay == null || !currentDay.hasNext() && incoming.hasNext()) { nextDay(); } MyObject result = next; if (currentDay != null && currentDay.hasNext()) { this.next = currentDay.next(); } else { this.next = null; } return result; } private void nextDay() { while (incoming.hasNext() && firstOfNextDay.getTimestamp().toLocalDate() .isEqual((firstOfNextDay = incoming.next()).getTimestamp().toLocalDate())) { nextDaysObjects.add(firstOfNextDay); } currentDay = nextDaysObjects.iterator(); if (incoming.hasNext()) { nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp)); nextDaysObjects.add(firstOfNextDay); } } } 

Use it as follows:

 public static void main(String[] args) { Stream<MyObject> stream = Stream.of( new MyObject(LocalDateTime.now().plusHours(1)), new MyObject(LocalDateTime.now()), new MyObject(LocalDateTime.now().plusDays(1).plusHours(2)), new MyObject(LocalDateTime.now().plusDays(1)), new MyObject(LocalDateTime.now().plusDays(1).plusHours(1)), new MyObject(LocalDateTime.now().plusDays(2)), new MyObject(LocalDateTime.now().plusDays(2).plusHours(1))); DayByDayIterator.streamOf(stream).forEach(System.out::println); } ------------------- Output ----------------- 2017-04-30T17:39:46.353 2017-04-30T18:39:46.333 2017-05-01T17:39:46.353 2017-05-01T18:39:46.353 2017-05-01T19:39:46.353 2017-05-02T17:39:46.353 2017-05-02T18:39:46.353 

Explanation: currentDay and next are the basis for the iterator, and firstOfNextDay and nextDaysObjects already looking at the first element of the next day. When currentDay exhausted, nextDay() is called and continues to add the incoming element to nextDaysObjects until the next day, and then turns nextDaysObjects into currentDay .

One thing: if the incoming stream is empty or empty, it will fail. You can check for null, but for an empty case, an Exception is required in the factory method. I did not want to add this for readability.

Hope this is what you need, let me know how this happens.

+2
source

If you are considering an iterative approach, I think it will be much simpler:

 TreeSet<MyObject> currentDayObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp)); LocalDate currentDay = null; for (MyObject m: stream::iterator) { LocalDate objectDay = m.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); if (currentDay == null) { currentDay = objectDay; } else if (!currentDay.equals(objectDay)) { // process a whole day of objects at once process(currentDayObjects); currentDay = objectDay; currentDayObjects.clear(); } currentDayObjects.add(m); } // process the data of the last day process(currentDayObjects); 
0
source

Source: https://habr.com/ru/post/1276129/


All Articles