Note. These implementations are somewhat erroneous and not deterministic. Please read the entire answer and comments before using this code.
How to create a work queue that rejects items when the provider is below the maximum pool size and starts accepting them after reaching the maximum?
It depends on the documented behavior:
"If the request cannot be queued, a new thread is created that will exceed the maximum value of PoolSize, in which case the task will be rejected."
public class ExecutorTest { private static final int CORE_POOL_SIZE = 2; private static final int MAXIMUM_POOL_SIZE = 4; private static final int KEEP_ALIVE_TIME_MS = 5000; public static void main(String[] args) { final SaturateExecutorBlockingQueue workQueue = new SaturateExecutorBlockingQueue(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME_MS, TimeUnit.MILLISECONDS, workQueue); workQueue.setExecutor(executor); for (int i = 0; i < 6; i++) { final int index = i; executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Runnable " + index + " on thread: " + Thread.currentThread()); } }); } } public static class SaturateExecutorBlockingQueue extends LinkedBlockingQueue<Runnable> { private ThreadPoolExecutor executor; public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public boolean offer(Runnable e) { if (executor.getPoolSize() < executor.getMaximumPoolSize()) { return false; } return super.offer(e); } } }
Note. Your question surprised me because I expected that your desired behavior would be the default behavior of ThreadPoolExecutor configured with corePoolSize <maximumPoolSize. But, as you point out, the JavaDoc for ThreadPoolExecutor explicitly states otherwise.
Idea number 2
I think I probably have a slightly better approach. It relies on side effect behavior encoded in the setCorePoolSize
method in ThreadPoolExecutor
. The idea is to temporarily and conditionally increase the size of the kernel pool when a work item is in the queue. As the kernel pool size increases, ThreadPoolExecutor
will immediately spawn enough new threads to complete all the tasks of the queue (queue.size ()). Then we immediately reduce the size of the core pool, which allows a natural reduction in the pool of threads in future periods of low activity. This approach is still not completely deterministic (perhaps the pool size, for example, grows above the maximum pool size), but I think that in almost all cases it is better than the first strategy.
In particular, I believe that this approach is better than the first, because:
- He will use threads more often.
- He will not reject performance as a result of the race.
- I would like to say again that the first approach makes the pool thread grow to its maximum size even with very light use. This approach should be much more effective in this regard.
-
public class ExecutorTest2 { private static final int KEEP_ALIVE_TIME_MS = 5000; private static final int CORE_POOL_SIZE = 2; private static final int MAXIMUM_POOL_SIZE = 4; public static void main(String[] args) throws InterruptedException { final SaturateExecutorBlockingQueue workQueue = new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE); final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME_MS, TimeUnit.MILLISECONDS, workQueue); workQueue.setExecutor(executor); for (int i = 0; i < 60; i++) { final int index = i; executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Runnable " + index + " on thread: " + Thread.currentThread() + " poolSize: " + executor.getPoolSize()); } }); } executor.shutdown(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } public static class SaturateExecutorBlockingQueue extends LinkedBlockingQueue<Runnable> { private final int corePoolSize; private final int maximumPoolSize; private ThreadPoolExecutor executor; public SaturateExecutorBlockingQueue(int corePoolSize, int maximumPoolSize) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public boolean offer(Runnable e) { if (super.offer(e) == false) { return false; }