Why thenComposeAsync expects a buyback

I wrote an example of contrived code, and it may not be the code that someone should use, but I believe that it should work. However, it is blocked instead. I read the answers described here , but found them insufficiently.

Here is a sample code:

import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class Test { public static void main(String argv[]) throws Exception { int nThreads = 1; Executor executor = Executors.newFixedThreadPool( nThreads ); CompletableFuture.completedFuture(true) .thenComposeAsync((unused)->{ System.err.println("About to enqueue task"); CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); executor.execute(() -> { // pretend this is some really expensive computation done asynchronously System.err.println("Inner task"); innerFuture.complete(true); }); System.err.println("Task enqueued"); return innerFuture; }, executor).get(); System.err.println("All done"); System.exit(0); } } 

Fingerprints:

About starting a task

Queue task

And then it hangs. It has come to a standstill because the artist has only one thread, and he is waiting for innerFuture to become redeemable. Why does the thenComposeAsync block for the return value become redeemable, instead of returning an incomplete future and freeing its stream in the executor?

This seems completely unintuitive, and javadocs really don't help. I basically don’t understand how CompletionStages work? Or is it a mistake in implementation?

+5
source share
2 answers

So, after a lot of interesting conversation, I decided to send one of the authors of the JDK. It turned out that this behavior was not intended and really is a mistake in 1.8u25. There is a fix to be released with a later version of the Java 8 patch. I don’t know which one. For those who want to test the new behavior, you can download the latest jsr166 jar here:

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

+2
source

First, let me rewrite your code with two static functions so that it is easier to see what happens:

 // Make an executor equivalent to Executors.newFixedThreadPool(nThreads) // that will trace to standard error when a task begins or ends static ExecutorService loggingExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.err.println("Executor beginning task on thread: " + t.getName()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.err.println("Executor finishing task on thread: " + Thread.currentThread().getName()); } }; } 

and

 // same as what you pass to thenComposeAsync static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) { return b -> { System.err.println(Thread.currentThread().getName() + ": About to enqueue task"); CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); executor.execute(() -> { System.err.println(Thread.currentThread().getName() + ": Inner task"); innerFuture.complete(true); }); System.err.println(Thread.currentThread().getName() + ": Task enqueued"); return innerFuture; }; } 

Now we can write a test case as follows:

 ExecutorService e = loggingExecutor(1); CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .join(); e.shutdown(); /* Output before deadlock: Executor beginning task on thread: pool-1-thread-1 pool-1-thread-1: About to enqueue task pool-1-thread-1: Task enqueued */ 

Check your conclusion that the first thread is not freed until the result of the second future is calculated:

 ExecutorService e = loggingExecutor(2); // use 2 threads this time CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .join(); e.shutdown(); /* Executor beginning task on thread: pool-1-thread-1 pool-1-thread-1: About to enqueue task pool-1-thread-1: Task enqueued Executor beginning task on thread: pool-1-thread-2 pool-1-thread-2: Inner task Executor finishing task on thread: pool-1-thread-2 Executor finishing task on thread: pool-1-thread-1 */ 

Indeed, it seems that thread 1 is held until thread 2 is executed

See if you yourself thenComposeAsync :

 ExecutorService e = loggingExecutor(1); CompletableFuture<Boolean> future = CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e); System.err.println("thenComposeAsync returned"); future.join(); e.shutdown(); /* thenComposeAsync returned Executor beginning task on thread: pool-1-thread-1 pool-1-thread-1: About to enqueue task pool-1-thread-1: Task enqueued */ 

thenComposeAsync not blocked. He immediately returned CompletableFuture , and a deadlock occurred only when we tried to complete it. So, what is needed to complete the future returned by .thenComposeAsync(inner(e), e) ?

  • The API must wait for innner (e) to return CompletableFuture<Boolean>
  • he needs to wait until the returned CompletableFuture<Boolean> . Only then will the future be completed. So, as you can see, he cannot do what you offer and return the incomplete Future.

This is mistake? Why is the CompletionStage held in stream 1 during the calculation of the internal task? This is not a mistake, because, as you have noticed, the documentation is rather vague and does not promise to release streams in any particular order. Also note that Thread1 will be used for any subsequent then*() CompletableFuture methods. Consider the following:

 ExecutorService e = loggingExecutor(2); CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .thenRun(() -> System.err.println(Thread.currentThread().getName() + ": All done")) .join(); e.shutdown(); /* Executor beginning task on thread: pool-1-thread-1 pool-1-thread-1: About to enqueue task pool-1-thread-1: Task enqueued Executor beginning task on thread: pool-1-thread-2 pool-1-thread-2: Inner task Executor finishing task on thread: pool-1-thread-2 pool-1-thread-1: All done Executor finishing task on thread: pool-1-thread-1 */ 

As you can see, .thenRun (...) was executed in thread 1. I believe this is consistent with other * Async (..., Executor) methods for CompletableFuture.

But what if you want to split the functionality of thenComposeAsync into 2 separately controlled steps, rather than leaving them to the API for juggling threads? You can simply do this:

 ExecutorService e = loggingExecutor(1); completedFuture(true) .thenApplyAsync(inner(e), e) // do the async part first .thenCompose(x -> x) // compose separately .thenRun(() -> System.err.println(Thread.currentThread().getName() + ": All done")) .join(); e.shutdown(); 

Everything will work well on 1 thread without deadlocks.

In conclusion, this behavior is not intuitive, as you say? I dont know. I can not imagine why thenComposeAsync exists. If the method returns CompletableFuture , it should not be blocked and there should be no reason for calling it asynchronously.

+1
source

Source: https://habr.com/ru/post/1209417/


All Articles