Calling a serial parallel thread makes all previous operations sequential

I have a significant data set and you want to call a slow but clean method and quickly call a method with side effects from the result of the first. I am not interested in intermediate results, so I would not want to collect them.

The obvious solution is to create a parallel thread, make a slow call, make the thread sequential again and make the call quickly. The problem is that ALL code is executed in one thread, there is no actual parallelism.

Code example:

@Test public void testParallelStream() throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() .parallel() .map(this::slowOperation) .sequential() .map(Function.identity())//some fast operation, but must be in single thread .collect(Collectors.toSet()) ).get(); System.out.println(threads); Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); } private String slowOperation(int value) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return Thread.currentThread().getName(); } 

If I remove sequential , the code will execute as expected, but obviously the non-parallel operation will be called by multiple threads.

Could you recommend some references to this behavior, or perhaps somehow avoid temporary collections?

+5
source share
2 answers

Switching the stream from parallel() to sequential() worked in the original Stream API project, but caused a lot of problems, and finally the implementation was changed , so it just turns the parallel flag on and off for the entire pipeline. The current documentation is really vague, but it has been improved in Java-9 :

The stream conveyor is executed sequentially or in parallel depending on the stream mode to which the terminal operation is called. Serial or parallel stream mode can be determined using the BaseStream.isParallel() method, and the stream mode can be changed using the BaseStream.sequential() and BaseStream.parallel() operations. The most recent serial or parallel mode parameter applies to the execution of the entire stream.

As for your problem, you can put everything in an intermediate List and start a new serial pipeline:

 new Random().ints(100).boxed() .parallel() .map(this::slowOperation) .collect(Collectors.toList()) // Start new stream here .stream() .map(Function.identity())//some fast operation, but must be in single thread .collect(Collectors.toSet()); 
+5
source

In the current implementation, the stream is either parallel or all sequential. While Javadoc is not explicit about this, and it may change in the future when it says it is possible.

S parallel ()

Returns the equivalent stream, parallel. It may return itself because the stream is already parallel, or because the underlying state of the stream has been changed to be parallel.

If you need a function for single-threaded processing, I suggest you use a lock or a synchronized block / method.

+1
source

Source: https://habr.com/ru/post/1244279/


All Articles