Java splits a stream by predicate into a stream of threads

I have hundreds of large (6 GB) gziped log files that I read using the GZIPInputStream that I want to GZIPInputStream . Suppose each of them has the format:

 Start of log entry 1 ...some log details ...some log details ...some log details Start of log entry 2 ...some log details ...some log details ...some log details Start of log entry 3 ...some log details ...some log details ...some log details 

I pass the contents of the gziped file line by line through BufferedReader.lines() . The stream looks like this:

 [ "Start of log entry 1", " ...some log details", " ...some log details", " ...some log details", "Start of log entry 2", " ...some log details", " ...some log details", " ...some log details", "Start of log entry 2", " ...some log details", " ...some log details", " ...some log details", ] 

The beginning of each log entry can be determined by the predicate: line -> line.startsWith("Start of log entry") . I would like to convert this Stream<String> to Stream<Stream<String>> according to this predicate. Each "substream" should begin when the predicate is true, and collect lines while the predicate is false, until the next time the predicate is true, which means the end of this substream and the beginning of the next. The result will look like this:

 [ [ "Start of log entry 1", " ...some log details", " ...some log details", " ...some log details", ], [ "Start of log entry 2", " ...some log details", " ...some log details", " ...some log details", ], [ "Start of log entry 3", " ...some log details", " ...some log details", " ...some log details", ], ] 

From there, I can take each substream and match it using new LogEntry(Stream<String> logLines) to aggregate the associated log lines into LogEntry objects.

Here's a rough idea of ​​how it would look:

 import java.io.*; import java.nio.charset.*; import java.util.*; import java.util.function.*; import java.util.stream.*; import static java.lang.System.out; class Untitled { static final String input = "Start of log entry 1\n" + " ...some log details\n" + " ...some log details\n" + " ...some log details\n" + "Start of log entry 2\n" + " ...some log details\n" + " ...some log details\n" + " ...some log details\n" + "Start of log entry 3\n" + " ...some log details\n" + " ...some log details\n" + " ...some log details"; static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); public static void main(String[] args) throws Exception { try (ByteArrayInputStream gzipInputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); BufferedReader reader = new BufferedReader( inputStreamReader )) { reader.lines() .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here? .map(LogEntry::new) .forEach(out::println); } } } 

Limitation: I have hundreds of these large files for parallel processing (but only one sequential stream for each file) that completely loads them into memory (for example, saving them as List<String> lines ), it is not possible.

Any help appreciated!

+5
source share
2 answers

I think the main problem is that you are reading line by line and trying to create an instance of LogEntry outside the lines, instead of reading block by block (which can span many lines).

For this you can use Scanner.findAll (available since Java 9) with the correct regular expression:

 String input = "Start of log entry 1\n" + " ...some log details 1.1\n" + " ...some log details 1.2\n" + " ...some log details 1.3\n" + "Start of log entry 2\n" + " ...some log details 2.1\n" + " ...some log details 2.2\n" + " ...some log details 2.3\n" + "Start of log entry 3\n" + " ...some log details 3.1\n" + " ...some log details 3.2\n" + " ...some log details 3.3"; try (ByteArrayInputStream gzip = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); InputStreamReader reader = new InputStreamReader(gzip); Scanner scanner = new Scanner(reader)) { String START = "Start of log entry \\d+"; Pattern pattern = Pattern.compile( START + "(?<=" + START + ").*?(?=" + START + "|$)", Pattern.DOTALL); scanner.findAll(pattern) .map(MatchResult::group) .map(s -> s.split("\\R")) .map(LogEntry::new) .forEach(System.out::println); } catch (IOException e) { throw new UncheckedIOException(e); } 

So this works lazily finding matches in the Scanner instance. Scanner.findAll returns a Stream<MatchResult> and MatchResult.group() returns a String match. Then we split this line into line breaks ( \\R ). This returns a String[] , with each element of the array being each string. Then, assuming LogEntry has a constructor that takes a String[] argument, we will convert each of these arrays to an instance of LogEntry . Finally, if LogEntry has an overriden toString() method, we print each instance of LogEntry to output.

It is worth noting that Scanner begins its work when forEach is called in the stream.

Notice that this is a regular expression that we use to match input journal entries. I am not an expert in the world of regular expressions, so I’m pretty sure that there is room for improvement. First of all, we use Pattern.DOTALL to . corresponded not only to common characters, but also line breaks. Then there is a real regular expression. The idea is that it matches and consumes Start of log entry \\d+ , then it uses look-behind versus Start of log entry \\d+ , then it consumes input characters in a non-animal way (is this part .*? ) And, finally, it looks - to check if there is another Start of log entry \\d+ event or if the end of the input has been reached. Please refer to this amazing regular expression article if you want to delve into this topic.


I do not know any such alternative if you are not on Java 9+. However, you can create a custom Spliterator that wraps the Spliterator returned by the stream returned by BufferedReader.lines() and adds the desired parsing to it. Then you will need to create a new Stream from this Spliterator . Not quite a trivial task ...

+2
source

The answer to Frederico is probably the most enjoyable way for this particular problem. Following his last thought on custom Spliterator , I will add an adapted version of the answer to a similar question , where I suggested using a custom iterator to create a marked stream. This approach will also work with other threads that are not created by input readers.

 public class StreamSplitter<T> implements Iterator<Stream<T>> { private Iterator<T> incoming; private Predicate<T> startOfNewEntry; private T nextLine; public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry) { Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry); return StreamSupport.stream(iterable.spliterator(), false); } private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry) { this.incoming = stream.iterator(); this.startOfNewEntry = startOfNewEntry; if (incoming.hasNext()) nextLine = incoming.next(); } @Override public boolean hasNext() { return nextLine != null; } @Override public Stream<T> next() { List<T> nextEntrysLines = new ArrayList<>(); do { nextEntrysLines.add(nextLine); } while (incoming.hasNext() && !startOfNewEntry.test((nextLine = incoming.next()))); if (!startOfNewEntry.test(nextLine)) // incoming does not have next nextLine = null; return nextEntrysLines.stream(); } } 

Example

 public static void main(String[] args) { Stream<String> flat = Stream.of("Start of log entry 1", " ...some log details", " ...some log details", "Start of log entry 2", " ...some log details", " ...some log details", "Start of log entry 3", " ...some log details", " ...some log details"); StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*")) .forEach(logEntry -> { System.out.println("------------------"); logEntry.forEach(System.out::println); }); } // Output // ------------------ // Start of log entry 1 // ...some log details // ...some log details // ------------------ // Start of log entry 2 // ...some log details // ...some log details // ------------------ // Start of log entry 3 // ...some log details // ...some log details 

An iterator always looks one line ahead. As soon as this line becomes the beginning of a new record, it will wrap the previous record in the stream and return it as next . The factory streamOf method turns this iterator into a stream to be used, as in the above example.

I changed the split condition from the regular expression to a Predicate , so you can specify more complex conditions with a few regular expressions, if conditions, etc.

Note that I only tested it with the examples above, so I don’t know how it will behave with more complex, erroneous or empty input.

+1
source

Source: https://habr.com/ru/post/1276120/


All Articles