Java 8 Interference flows inside forEach on the field

Consider the following stupid program using java 8 threads:

private int biggestInt; private void run() { ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<Integer>> callables = new ArrayList<>(); for (int i = 0; i<50; i++) { callables.add(randomInt()); } try { executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(this::compareBiggestInt); } catch (InterruptedException e) { /* do nothing */ } } private Callable<Integer> randomInt() { return () -> { Random random = new Random(System.currentTimeMillis()); return random.nextInt(); }; } private void compareBiggestInt(Integer in) { if (in > biggestInt) biggestInt = in; } 

My question is that for aach (this :: compareBiggestInt) is executed in parallel and thus will introduce the race condition on mostInt?

If so, how can I avoid this race condition? Can I, for example, change the method as follows?

 private synchronized void compareBiggestInt(Integer in) {[...]} 

Any help is appreciated!

+5
source share
5 answers

No, forEach not running in parallel. This violates the general contract on how forEach will behave when used with stream() as opposed to parallelStream() , and it is not affected by the fact that you entered the ExecutorService .

invokeAll() actually returns a List Future instances that are either complete or deprecated. Thus, the parallel part is already completed by the time you interact with your thread.

+2
source

forEach not running in a parallel thread. what actually performs asynchronous tasks is the executor . the Stream#map operation will wait for the completion of the entire Future .

IF you want the operation to be executed in a parallel thread, you must use the reduction operation Reduce thread # . eg:

 biggestInt = executor.invokeAll(callables) .parallelStream() .map(...)// same with yours .reduce(BinaryOperator.maxBy(Comparator.naturalOrder())) .orElse(null); 
+2
source

Here are a few issues. Firstly:

 return () -> { Random random = new Random(System.currentTimeMillis()); return random.nextInt(); }; 

The execution can be so fast (I can easily reproduce it) that it will return the same value all the time.

I would suggest you remove this millis as a minimum:

 private static Callable<Integer> randomInt() { return () -> { Random random = new Random(); int x = random.nextInt(100); System.out.println(x); return x; }; } 

Or it's even better to use ThreadLocalRandom.current().nextInt(100)

I also changed nextInt to return to within [0.. 100] , because nextInt could return a negative value and assume that you are returning 50 negative values, and then your max will be zero (default value) from biggestInt ; which is obviously wrong.

And then your sequential stream and inside each map operation you block until you finish Future.get . This way your forEach is executed in a single thread.

+2
source

You are not using a parallel thread, so your thread is sequential. If you want to be sure that your thread will execute sequentially, add the .sequential() method to your thread.

From docs :

 default Stream<E> stream() Returns a sequential Stream with this collection as its source. 
0
source

Assuming you are using parallel streams (I changed the code to use "parallelStream"), you should protect all changes to shared mutable variables.

for example, in the code below, I use "synchronized" in the "compareBiggestInt" method to protect all calls to the "mostInt" variable. (if you delete the “synchronized” one and run the code below, you will see that there really exists a race condition in the “compareBiggestInt” method)

 import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ParallelStreamExample { private volatile int biggestInt; public static void main(String[] args) { ParallelStreamExample parallelStreamExample = new ParallelStreamExample(); parallelStreamExample.doTheWork(); } private void doTheWork() { ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<Integer>> callables = new ArrayList<>(); for (int i = 0; i < 5; i++) { callables.add(randomInt()); } try { executor.invokeAll(callables) .parallelStream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(this::compareBiggestInt); } catch (InterruptedException e) { /* do nothing */ } } private Callable<Integer> randomInt() { return () -> { Random random = new Random(); return random.nextInt(10); }; } private synchronized void compareBiggestInt(Integer in) { System.out.println("in:" + in + " - current biggestint = " + biggestInt); if (in > biggestInt) { try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } biggestInt = in; } System.out.println("in:" + in + " - current biggestint = " + biggestInt); } } 
0
source

Source: https://habr.com/ru/post/1268904/


All Articles