Concurrency: how to implement an executor with inbound and outbound queues?

As you know, ThreadPoolExecutor uses some BlockingQueue as an incoming task queue. I want ThreadPoolExecutor to have a second priority for task results that were ready. I want to use this queue as a source for I / O services that send or save these results.

Why do I want to create a separate queue? Because I want to separate the action from sending the results from the action of receiving the results. In addition, I believe that any Exceptions and Delays that accompany I / O should not affect my ThreadPoolExecutor, which computes the result.

I created some naive implementation of this. I would like to get some criticism about this. Maybe it can be implemented with ready-made Java classes better? I am using Java 7.

public class ThreadPoolWithResultQueue {
    interface Callback<T> {
        void complete(T t);
    }
    public abstract static class CallbackTask<T> implements Runnable {
        private final Callback callback;   
        CallbackTask(Callback callback) {
            this.callback = callback;
        }    
        public abstract T execute();   
        final public void run() {
            T t = execute();
            callback.complete(t);
        }
    }   
    public static class CallBackTaskString extends CallbackTask<String> {
        public CallBackTaskString(Callback callback) {
            super(callback);
        }
        @Override
        public String execute() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
            }
            return hashCode() + "-" + System.currentTimeMillis();
        }
    }    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
        Callback<String> addToQueueCallback = new Callback<String>() {
            @Override
            public void complete(String s) {
                System.out.println("Adding Result To Queue " + s);
                resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
        for (int i = 0; i <= 5; i++) {
            executor.submit(new CallBackTaskString(addToQueueCallback));
        };
        System.out.println("All submitted.");
        executor.shutdown();
        executor.awaitTermination(10l, TimeUnit.SECONDS);
        System.out.println("Result queue size " + resultQueue.size());
    }
}
+4
source share
1 answer

For the makinf component of the library you have to wrap things up ...

You can expand the thread pool executor, which has several methods for intercepting tasks that are sent, so you must queue the queue passed in the constructor.

This is basically an ExecutorCompletionService, but you will allow the user to attach the queue instead of appearing as one.

Otherwise, this is a typical proxy task. Fair work.

0

Source: https://habr.com/ru/post/1655422/


All Articles