Java Strategy ThreadPoolExecutor, Direct Transfer Service Queue?

I am looking to have a ThreadPoolExecutor where I can set corePoolSize and maximumPoolSize , and what happens, the queue should immediately transfer the task to the thread pool and thereby create new threads until it reaches maximumPoolSize then start adding to the queue.

Is there such a thing? If not, is there a good reason that he does not have such a strategy?

Basically, I want the tasks to be submitted for execution, and when it reaches the point where it will essentially get β€œworse” performance due to too many threads (by setting maximumPoolSize), it will stop adding new threads and working with it thread pool and run the queue, and then if the queue is full, it rejects.

And when the download returns, it can begin to dismantle threads that are not being used back in corePoolSize.

This makes sense to me in my application than the "three common strategies" listed in http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

+6
source share
2 answers

Note. These implementations are somewhat erroneous and not deterministic. Please read the entire answer and comments before using this code.

How to create a work queue that rejects items when the provider is below the maximum pool size and starts accepting them after reaching the maximum?

It depends on the documented behavior:

"If the request cannot be queued, a new thread is created that will exceed the maximum value of PoolSize, in which case the task will be rejected."

 public class ExecutorTest { private static final int CORE_POOL_SIZE = 2; private static final int MAXIMUM_POOL_SIZE = 4; private static final int KEEP_ALIVE_TIME_MS = 5000; public static void main(String[] args) { final SaturateExecutorBlockingQueue workQueue = new SaturateExecutorBlockingQueue(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME_MS, TimeUnit.MILLISECONDS, workQueue); workQueue.setExecutor(executor); for (int i = 0; i < 6; i++) { final int index = i; executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Runnable " + index + " on thread: " + Thread.currentThread()); } }); } } public static class SaturateExecutorBlockingQueue extends LinkedBlockingQueue<Runnable> { private ThreadPoolExecutor executor; public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public boolean offer(Runnable e) { if (executor.getPoolSize() < executor.getMaximumPoolSize()) { return false; } return super.offer(e); } } } 

Note. Your question surprised me because I expected that your desired behavior would be the default behavior of ThreadPoolExecutor configured with corePoolSize <maximumPoolSize. But, as you point out, the JavaDoc for ThreadPoolExecutor explicitly states otherwise.


Idea number 2

I think I probably have a slightly better approach. It relies on side effect behavior encoded in the setCorePoolSize method in ThreadPoolExecutor . The idea is to temporarily and conditionally increase the size of the kernel pool when a work item is in the queue. As the kernel pool size increases, ThreadPoolExecutor will immediately spawn enough new threads to complete all the tasks of the queue (queue.size ()). Then we immediately reduce the size of the core pool, which allows a natural reduction in the pool of threads in future periods of low activity. This approach is still not completely deterministic (perhaps the pool size, for example, grows above the maximum pool size), but I think that in almost all cases it is better than the first strategy.

In particular, I believe that this approach is better than the first, because:

  • He will use threads more often.
  • He will not reject performance as a result of the race.
  • I would like to say again that the first approach makes the pool thread grow to its maximum size even with very light use. This approach should be much more effective in this regard.

-

 public class ExecutorTest2 { private static final int KEEP_ALIVE_TIME_MS = 5000; private static final int CORE_POOL_SIZE = 2; private static final int MAXIMUM_POOL_SIZE = 4; public static void main(String[] args) throws InterruptedException { final SaturateExecutorBlockingQueue workQueue = new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE); final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME_MS, TimeUnit.MILLISECONDS, workQueue); workQueue.setExecutor(executor); for (int i = 0; i < 60; i++) { final int index = i; executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Runnable " + index + " on thread: " + Thread.currentThread() + " poolSize: " + executor.getPoolSize()); } }); } executor.shutdown(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } public static class SaturateExecutorBlockingQueue extends LinkedBlockingQueue<Runnable> { private final int corePoolSize; private final int maximumPoolSize; private ThreadPoolExecutor executor; public SaturateExecutorBlockingQueue(int corePoolSize, int maximumPoolSize) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public boolean offer(Runnable e) { if (super.offer(e) == false) { return false; } // Uncomment one or both of the below lines to increase // the likelyhood of the threadpool reusing an existing thread // vs. spawning a new one. //Thread.yield(); //Thread.sleep(0); int currentPoolSize = executor.getPoolSize(); if (currentPoolSize < maximumPoolSize && currentPoolSize >= corePoolSize) { executor.setCorePoolSize(currentPoolSize + 1); executor.setCorePoolSize(corePoolSize); } return true; } } } 
+3
source

We found a solution to this problem with the following code:

This queue is a hybrid SynchronousQueue / LinkedBlockingQueue.

 public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> { private static final long serialVersionUID = 1L; private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>(); public OverflowingSynchronousQueue() { super(); } public OverflowingSynchronousQueue(int capacity) { super(capacity); } @Override public boolean offer(E e) { // Create a new thread or wake an idled thread return synchronousQueue.offer(e); } public boolean offerToOverflowingQueue(E e) { // Add to queue return super.offer(e); } @Override public E take() throws InterruptedException { // Return tasks from queue, if any, without blocking E task = super.poll(); if (task != null) { return task; } else { // Block on the SynchronousQueue take return synchronousQueue.take(); } } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { // Return tasks from queue, if any, without blocking E task = super.poll(); if (task != null) { return task; } else { // Block on the SynchronousQueue poll return synchronousQueue.poll(timeout, unit); } } 

}

For it to work, we need to wrap the RejectedExecutionHandler to call "offerToOverflowingQueue" when the task is rejected.

 public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler { private OverflowingSynchronousQueue<Runnable> queue; private RejectedExecutionHandler adaptedRejectedExecutionHandler; public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue, RejectedExecutionHandler adaptedRejectedExecutionHandler) { super(); this.queue = queue; this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!queue.offerToOverflowingQueue(r)) { adaptedRejectedExecutionHandler.rejectedExecution(r, executor); } } } 

Here we create ThreadPoolExecutor

 public static ExecutorService newSaturatingThreadPool(int corePoolSize, int maxPoolSize, int maxQueueSize, long keepAliveTime, TimeUnit timeUnit, String threadNamePrefix, RejectedExecutionHandler rejectedExecutionHandler) { OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize); OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue, rejectedExecutionHandler); ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue, new NamedThreadFactory(threadNamePrefix), rejectionPolicyAdapter); return executor; } 
+2
source

Source: https://habr.com/ru/post/910268/


All Articles