First, let me rewrite your code with two static functions so that it is easier to see what happens:
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads) // that will trace to standard error when a task begins or ends static ExecutorService loggingExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.err.println("Executor beginning task on thread: " + t.getName()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.err.println("Executor finishing task on thread: " + Thread.currentThread().getName()); } }; }
and
// same as what you pass to thenComposeAsync static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) { return b -> { System.err.println(Thread.currentThread().getName() + ": About to enqueue task"); CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); executor.execute(() -> { System.err.println(Thread.currentThread().getName() + ": Inner task"); innerFuture.complete(true); }); System.err.println(Thread.currentThread().getName() + ": Task enqueued"); return innerFuture; }; }
Now we can write a test case as follows:
ExecutorService e = loggingExecutor(1); CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .join(); e.shutdown();
Check your conclusion that the first thread is not freed until the result of the second future is calculated:
ExecutorService e = loggingExecutor(2); // use 2 threads this time CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .join(); e.shutdown();
Indeed, it seems that thread 1 is held until thread 2 is executed
See if you yourself thenComposeAsync :
ExecutorService e = loggingExecutor(1); CompletableFuture<Boolean> future = CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e); System.err.println("thenComposeAsync returned"); future.join(); e.shutdown();
thenComposeAsync not blocked. He immediately returned CompletableFuture , and a deadlock occurred only when we tried to complete it. So, what is needed to complete the future returned by .thenComposeAsync(inner(e), e) ?
- The API must wait for innner (e) to return
CompletableFuture<Boolean> - he needs to wait until the returned
CompletableFuture<Boolean> . Only then will the future be completed. So, as you can see, he cannot do what you offer and return the incomplete Future.
This is mistake? Why is the CompletionStage held in stream 1 during the calculation of the internal task? This is not a mistake, because, as you have noticed, the documentation is rather vague and does not promise to release streams in any particular order. Also note that Thread1 will be used for any subsequent then*() CompletableFuture methods. Consider the following:
ExecutorService e = loggingExecutor(2); CompletableFuture.completedFuture(true) .thenComposeAsync(inner(e), e) .thenRun(() -> System.err.println(Thread.currentThread().getName() + ": All done")) .join(); e.shutdown(); /* Executor beginning task on thread: pool-1-thread-1 pool-1-thread-1: About to enqueue task pool-1-thread-1: Task enqueued Executor beginning task on thread: pool-1-thread-2 pool-1-thread-2: Inner task Executor finishing task on thread: pool-1-thread-2 pool-1-thread-1: All done Executor finishing task on thread: pool-1-thread-1 */
As you can see, .thenRun (...) was executed in thread 1. I believe this is consistent with other * Async (..., Executor) methods for CompletableFuture.
But what if you want to split the functionality of thenComposeAsync into 2 separately controlled steps, rather than leaving them to the API for juggling threads? You can simply do this:
ExecutorService e = loggingExecutor(1); completedFuture(true) .thenApplyAsync(inner(e), e) // do the async part first .thenCompose(x -> x) // compose separately .thenRun(() -> System.err.println(Thread.currentThread().getName() + ": All done")) .join(); e.shutdown();
Everything will work well on 1 thread without deadlocks.
In conclusion, this behavior is not intuitive, as you say? I dont know. I can not imagine why thenComposeAsync exists. If the method returns CompletableFuture , it should not be blocked and there should be no reason for calling it asynchronously.