How can I implement or find the thread-safe CompletionService equivalent?

I have a simple web service running inside a Tomcat container, which by its nature is multi-threaded. In every request that comes to the service, I want to make simultaneous calls to the external service. The ExecutorCompletionService in java.util.concurrent gets me kind of there. I can provide him with a thread pool, and he will take care of making my simultaneous calls, and I will be notified when any of the results is ready.

The code for processing a specific incoming request may look like this:

void handleRequest(Integer[] input) { // Submit tasks CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool()); for (final Integer i : input) { completionService.submit(new Callable<Integer>() { public Integer call() { return -1 * i; } }); } // Do other stuff... // Get task results try { for (int i = 0; i < input.size; i++) { Future<Integer> future = completionService.take(); Integer result = future.get(); // Do something with the result... } } catch (Exception e) { // Handle exception } } 

This should work fine and dandy, but pretty inefficient as a new thread pool is allocated for each incoming request. If I translate the CompletionService as a generic instance, I will encounter thread safety issues with multiple requests using the same CompletionService set and thread pool. Because requests submit tasks and receive results, the results they receive do not match the ones they submitted.

Thus, I need a thread-safe CompletionService that allows me to share a common thread pool across all incoming requests. As each thread completes the task, the corresponding thread for the incoming request must be notified so that it can collect the results.

What is the easiest way to implement this functionality? I am sure that this scheme has been applied many times; I'm just not sure if this is something provided by the Java concurrency library, or can be easily built using some Java concurrency building blocks.

UPDATE: The one caveat. I forgot to mention that I would like to be notified as soon as all my assigned tasks are completed. This is the main advantage of using CompletionService, because it separates the production and consumption of tasks and results. I really don't like the order in which I return the results, and I would like to avoid unnecessarily blocking, waiting for the results to be returned in order.

+2
source share
4 answers

You share Executor , but not CompletionService .

We have AsyncCompleter that does just that and processes the entire account, allowing you to:

 Iterable<Callable<A>> jobs = jobs(); Iterable<A> results async.invokeAll(jobs); 

results iterates in return order and blocks until a result is received

+2
source

You can simply use the usual general ExecutorService. Whenever you submit a task, you will return the future for the task that you just sent. You can save them all in a list and request them later.

Example:

 private final ExecutorService service = ...//a single, shared instance void handleRequest(Integer[] input) { // Submit tasks List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length); for (final Integer i : input) { Future<Integer> future = service.submit(new Callable<Integer>() { public Integer call() { return -1 * i; } }); futures.add(future); } // Do other stuff... // Get task results for(Future<Integer> f : futures){ try { Integer result = f.get(); } catch (Exception e) { e.printStackTrace(); } } } 
+1
source

java.util.concurrent provides everything you need. If I understand your question correctly, you have the following requirements:

You want to send requests and immediately (within reasonable limits) process the result of the request (response). Well, I believe that you have already seen a solution to your problem: java.util.concurrent.CompletionService.

This service, which rather combines Executor and BlockingQueue to handle Runnable and / or Callable tasks. BlockingQueue is used to complete completed tasks, which you can expect to wait for the next thread until the task completes its task (this is done by calling the take () method) of the CompletionService object.

As previous posters noted, share with the Contractor and create a CompletionService for each request. This may seem like an expensive affair, but think again that CS is simply collaborating with Contractor and BlockingQueue. Since you are using the most expensive object to instantiate, i.e. Contractor, I think you will find that it is a very reasonable cost.

However ... all of this suggests that you still have a problem, and that problem seems to be separating request processing from response processing. This can be approached by creating a separate service that will exclusively process requests for all requests or for a specific type of request.

Here is an example: (Note: this implies that the Request object implements the Callable interface, which should return a Response type ... the details that I skipped for this simple example).

 class RequestHandler { RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) { this.responseQueue = ... this.executor = ... } public void acceptRequest(List<Request> requestList) { for(Request req : requestList) { Response response = executor.submit(req); responseHandler.handleResponse(response); } } } class ResponseHandler { ReentrantLock lock; ResponseHandler(ExecutorService responseExecutor) { ... } public void handleResponse(Response res) { lock.lock() { try { responseExecutor.submit( new ResponseWorker(res) ); } finally { lock.unlock(); } } private static class ResponseWorker implements Runnable { ResponseWorker(Response response) { response = ... } void processResponse() { // process this response } public void run() { processResponse(); } } } 

A few things to remember: one, the ExecutorService executes Callables or Runnables from the lock queue; your RequestHandler receives the task, and those that are in the queue in the Executor are processed as soon as possible. The same thing happens in your ResponseHandler; a response will be received, and as soon as this SEPARATE executor can, he will process this response. In short, you have two executors working at the same time: one on the Request objects, the other on the Response objects.

+1
source

Why do you need a CompletionService ?

Each thread can simply send or call Callables to the "regular" and shared instance of ExecutorService . Each thread then holds its own private Future links.

In addition, Executor and its descendants are thread safe in design. You really want each thread to be able to create its own tasks and check their results.

Javadoc in java.util.concurrent excellent; It includes usage patterns and examples. Read the document ExecutorService and other types in order to better understand how to use them.

0
source

Source: https://habr.com/ru/post/1346165/


All Articles