Flow and parallel flow

I have a test code:

List<Integer> list = new ArrayList<>(1000000); for(int i=0;i<1000000;i++){ list.add(i); } List<String> values = new ArrayList<>(1000000); list.stream().forEach( i->values.add(new Date().toString()) ); System.out.println(values.size()); 

By running this, I got the correct result: 1,000,000.

However, if I change stream() to parallelStream() , as this:

  list.parallelStream().forEach( i->values.add(new Date().toString()) ); 

I got a random output, for example: 920821.

What's wrong?

+5
source share
3 answers

ArrayList not syncing. An attempt to add items to it at the same time is not defined. From forEach :

For parallel stream pipelines, this operation does not guarantee respect for the order in which the stream is called, since it will benefit parallelism. For any given element, an action can be performed at any time and in any thread that the library selects .

In your second example, you get multiple threads calling add in the list of arrays at the same time, and the ArrayList documentation says:

Please note that this implementation is not synchronized. If multiple threads access the ArrayList instance at the same time, and at least one of the threads modifies the list structurally, it must be synchronized from the outside.

Wrong decision

If you change the use of ArrayList to Vector , you will get the correct result, because this list implementation is synchronized. His Javadok says:

Unlike new collection implementations, Vector synchronized.

However, do not use it! In addition, it may be slower due to explicit synchronization.

The right approach

Explicitly avoiding this situation is that the Stream API provides mutable reduction using collect . Following

 List<String> values = list.stream().map(i -> "foo").collect(Collectors.toList()); 

will always provide the correct result, regardless of whether it runs in parallel or not. The Stream pipeline internally handles concurrency and ensures that it is safe to use a non-competitive collector in the parallel stream collection operation . Collectors.toList() is a built-in collector that accumulates the elements of a stream into a list.

+9
source

Using Consumer, you need to worry about thread safety. A simpler solution that allows the Stream API to accumulate results.

 List<String> values = IntStream.range(0, 1_000_000).parallel() .mapToObj(i -> new Date().toString()) .collect(Collectors.toList()); 

The key reason to avoid using a thread safe collector such as Vector is that each thread gets a shared lock with a bottleneck, i.e. you will spend time getting and unlocking the lock, and only one thread at a time can access it You can easily get a solution that is slower than using a single thread.

+4
source

values.add(String) not thread safe. When you call this method from different threads without synchronization, this does not guarantee that it will work as expected.

To fix this, you can:

  • use a thread safe collection like Vector or CopyOnWriteArrayLis .
  • Explicitly sync your code. For example, put synchronize(this){values.add(new Date().toString())} in your code. Note i-> is out of sync block
  • Or in this case, the elments map to get a new stream, as in @PeterLawrey's answer: IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());
+2
source

Source: https://habr.com/ru/post/1011686/


All Articles