How can I catch a RejectedExecutionException in a call to CompletableFuture whenCompleteAsync?

In the following code example, I insert biconsumerthat sleeps for 100 milliseconds as the action of completing a set of terminated future. I used the method whenCompleteAsync, providing a separate one executorServicefor use. executorService- this is ThreadPoolExecutorwith a pool size of 5, a maximum size of 5, and a queue length of 1.

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) {
            list.get(i).complete(i + "");
        }
    }
}

When I ran the code, although I complete 100 futures, only 6 outputs are printed. These are 5 main threads and 1 queue in the queue. What happens to the rest? If other executables could not be sent to the executing service because the queue is already full, there should be no exception.?

Conclusion

Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
+4
1

, CompletableFuture , , .

ThreadPoolExecutor , RejectedExecutionHandler, . , a RejectedExecutionException , ExecutorService . , ​​ ?

, whenCompleteAsync. , CompletableFuture, stringCompletableFuture. stringCompletableFuture ( ), CompletableFuture ( ) BiConsumer ExecutorService.

ExecutorService , RejectedExecutionHandler, RejectedExecutionException. completeExceptionally CompletableFuture, .

, for CompletableFuture, whenCompleteAsync, .

ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
        System.out.println("Complete " + e);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {e1.printStackTrace();}
    }, executorService);
    dependents.add(thisWillHaveException);
    list.add(stringCompletableFuture);
}

for (int i = 0; i < list.size(); i++) {
    list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
    cf.whenComplete((r, e) -> {
        if (e != null)
            System.out.println(cf + " " + e.getMessage());
    });
});

, ( 6, ) RejectedExecutionException.

...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
+5

Source: https://habr.com/ru/post/1670442/


All Articles