Parallel Stream Exception Exception

I have a bunch of columns in the form of string arrays from a csv file. Now I want to take them apart. Since this parsing requires date parsing and other not-so-fast parsing methods, I thought of parallelism (I timed it, it takes some time). My simple approach:

Stream.of(columns).parallel().forEach(column -> result[column.index] = parseColumn(valueCache[column.index], column.type)); 

Columns contain ColumnDescriptor elements that simply have two attributes, the parsed column index and the type that determines how to parse it. Nothing more. the result is an Object array that accepts the resulting arrays.

The problem is that the parse function throws a ParseException, which I handle later in the call stack. Since we are here in parallel, we cannot just throw it away. What is the best way to handle this?

I have this solution, but I kind of read it. What would be the best way to do this?

 final CompletableFuture<ParseException> thrownException = new CompletableFuture<>(); Stream.of(columns).parallel().forEach(column -> { try { result[column.index] = parseColumn(valueCache[column.index], column.type); } catch (ParseException e) { thrownException.complete(e); }}); if(thrownException.isDone()) //only can be done if there is a value set. throw thrownException.getNow(null); 

Notes: I do not need all exceptions. If I disassemble them sequentially, I also get only one. So this is normal.

+5
source share
2 answers

The problem is your incorrect premise: "Since we are here in parallel, you cannot just abandon it." There is no specification prohibiting the exclusion of throwing in parallel processing. You can simply throw this exception in a parallel thread as you would in a serial thread, wrapping it in an unchecked exception if it is a checked exception.

If the stream has at least one exception, the forEach call will propagate it (or one of them) to the caller.

The only problem you may encounter is that the current implementation does not expect all threads to complete when an exception is detected. This can be circumvented by using

 try { Arrays.stream(columns).parallel() .forEach(column -> result[column.index] = parseColumn(valueCache[column.index], column.type)); } catch(Throwable t) { ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES); throw t; } 

But usually you do not need this, because you do not get access to the simultaneously processed result in an exceptional case.

+3
source

I think the question is more, what do you usually do when parsing it sequentially?

Do you stop at the first exception and stop the whole process? In this case, wrap the exception as a run-time exception and let the thread abort and throw it. Catch the wrapper exception, expand it and handle it.

Are you missing out on bad recordings? Then either 1. track errors in the list somewhere or 2. create a wrapper object that can contain either the analyzed result or the error (do not track the exceptions themselves, but only the minimum necessary to describe the error).

Check if there were errors in the list for the first parameter or records that had errors differently for the second option will be displayed.

+1
source

Source: https://habr.com/ru/post/1265928/


All Articles