How to use CompletableFuture.thenComposeAsync ()?

Given:

public class Test { public static void main(String[] args) { int nThreads = 1; Executor e = Executors.newFixedThreadPool(nThreads); CompletableFuture.runAsync(() -> { System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); }, e).thenComposeAsync((Void unused) -> { return CompletableFuture.runAsync(() -> { System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); }, e); }, e).join(); System.out.println("finished"); } } 

I expect one execution thread to complete task 1 and then complete task 2. Instead, the code freezes if nThreads less than 2.

  • Please explain why the code hangs. I see that it is locked in CompletableFuture: 616 while waiting for Future complete, but it is not clear why.
  • If I allow 2 threads, what is each thread used for?

In short, please help me understand how thenComposeAsync() works. Javadoc looks like it was written for robots, not for people :)

+6
source share
2 answers
  • The thenComposeAsync method puts a new task for your artist, which captures a single thread and waits for the completion of your Task 2 . But it has no more threads to run. Instead, you can use the thenCompose method, which runs on the same thread as Task 1 , to avoid a deadlock.

  • One thread executes Task 1 and Task 2 , and the second is responsible for compiling the results of the two.

Note. CompletableFuture (s) works best with ForkJoinPool , which is more efficient when processing tasks that generate new tasks. By default, ForkJoinPool was added in Java 8 for this purpose and is used by default unless you specify an artist to perform your tasks.

Here's a good presentation on how these new features glow and how they work: Reverse programming patterns with Java 8 flags .

+8
source

It blocks on runAsync inside thenComposeAsync . thenComposeAsync launches the supplied function in the stream inside the executor e. But the function that you gave it is trying to execute the runAsync body itself inside the same executor.

You can better understand what happens by adding another trace output:

 CompletableFuture.runAsync(() -> { System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); }, e).thenComposeAsync((Void unused) -> { System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId()); return CompletableFuture.runAsync(() -> { System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); }, e); }, e).join(); 

Now, if you run it using a 2-thread artist, you will see that Task 1 1/2 and Task 2 work on different threads.

To fix this, replace thenComposeAsync with a regular thenCompose . I'm not sure why you ever used thenComposeAsync . If you have a method that returns CompletableFuture , this method is presumably not blocked and should not be executed asynchronously.

+4
source

Source: https://habr.com/ru/post/977231/


All Articles