As you know, ThreadPoolExecutor uses some BlockingQueue as an incoming task queue. I want ThreadPoolExecutor to have a second priority for task results that were ready. I want to use this queue as a source for I / O services that send or save these results.
Why do I want to create a separate queue? Because I want to separate the action from sending the results from the action of receiving the results. In addition, I believe that any Exceptions and Delays that accompany I / O should not affect my ThreadPoolExecutor, which computes the result.
I created some naive implementation of this. I would like to get some criticism about this. Maybe it can be implemented with ready-made Java classes better? I am using Java 7.
public class ThreadPoolWithResultQueue {
interface Callback<T> {
void complete(T t);
}
public abstract static class CallbackTask<T> implements Runnable {
private final Callback callback;
CallbackTask(Callback callback) {
this.callback = callback;
}
public abstract T execute();
final public void run() {
T t = execute();
callback.complete(t);
}
}
public static class CallBackTaskString extends CallbackTask<String> {
public CallBackTaskString(Callback callback) {
super(callback);
}
@Override
public String execute() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
return hashCode() + "-" + System.currentTimeMillis();
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
Callback<String> addToQueueCallback = new Callback<String>() {
@Override
public void complete(String s) {
System.out.println("Adding Result To Queue " + s);
resultQueue.add(s);
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
for (int i = 0; i <= 5; i++) {
executor.submit(new CallBackTaskString(addToQueueCallback));
};
System.out.println("All submitted.");
executor.shutdown();
executor.awaitTermination(10l, TimeUnit.SECONDS);
System.out.println("Result queue size " + resultQueue.size());
}
}
source
share