Queue behavior ThreadPoolExecutor is customizable to prefer new thread creation in the queue?

ThreadPoolExecutor doc says

If corePoolSize or more threads are running, the Contractor always prefers the order of the request, rather than adding a new thread.


If more than corePoolSize but less than maximumPoolSize threads are executed, a new thread will be created only if the queue is full.

Is there a way to make the artist prefer to re-create the thread until max is reached, even if there are more than the main size threads, and then start the queue? Tasks will be rejected if the queue reaches its maximum size. It would be nice if the timeout set and deleted threads to the kernel size after processing the downloaded package. I see the reason they prefer to stand in line to allow throttling; however, this setting will additionally allow the queue to act primarily as a list of tasks that have yet to be completed.

+4
source share
4 answers

Unable to get this exact behavior with ThreadPoolExecutor .

But here are a couple of solutions:

  • Consider

    • If a thread running less than corePoolSize , a new thread will be created for each item in the queue until coorPoolSize threads are coorPoolSize .

    • A new thread will only be created if the queue is full and fewer maximumPoolSize threads are maximumPoolSize .

    So, wrap ThreadPoolExecutor in a class that controls how fast objects are delivered. Then, resize the primary pool to a higher value when multiple items are sent. This will create a new stream every time a new item is sent.

    When the send package is complete, the kernel pool size must be manually reduced again so that threads can naturally shut down. If you are worried that an intense spike may end abruptly, causing the manual method to fail, be sure to use allowCoreThreadTimeout .

  • Create a fixed thread pool and allowCoreThreadTimeout

    Unfortunately, this uses more flows during low request packets and does not save negligible flows during zero traffic.

Use the 1st solution if you have the time, need and inclination, as it will handle a wider range of reception frequencies and, thus, is the best solution in terms of flexibility.

Otherwise, use the second solution.

+4
source

Just do what Executors.newFixedThreadPool does, and set core and max to the same value. Here's the source of newFixedThreadPool from Java 6:

 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 

What you can do if you have an existing one:

 ThreadPoolExecutor tpe = ... ; tpe.setCorePoolSize(tpe.getMaxPoolSize()); 

Edit: As William points out in the comments, this means that all threads are the main threads, so none of the threads will time out and end. To change this behavior, simply use ThreadPoolExecutor.allowCoreThreadTimeout(true) . This will make the threads time out and swept away when the artist is not in use.

+2
source

Your preference seems to be minimal latency during low activity. To do this, I would just set corePoolSize to the maximum size and make it possible to support additional threads. In periods of high activity, these flows will be anyway. In periods of low activity, their existence will not have such a big impact. You can set the timeout of the main thread if you want them to die.

Thus, all threads will always be available to complete the task as soon as possible.

+1
source

CustomBlockingQueue

 package com.gunjan; import java.util.concurrent.BlockingQueue; public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> { public BlockingQueue<E> blockingQueue; public CustomBlockingQueue(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override final public boolean offer(E e) { return false; } final public boolean customOffer(E e) { return blockingQueue.offer(e); } } 

ThreadPoolBlockingQueue

 package com.gunjan; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> { public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) { super(blockingQueue); } @Override public E remove() { return this.blockingQueue.remove(); } @Override public E poll() { return this.blockingQueue.poll(); } @Override public E element() { return this.blockingQueue.element(); } @Override public E peek() { return this.blockingQueue.peek(); } @Override public int size() { return this.blockingQueue.size(); } @Override public boolean isEmpty() { return this.blockingQueue.isEmpty(); } @Override public Iterator<E> iterator() { return this.blockingQueue.iterator(); } @Override public Object[] toArray() { return this.blockingQueue.toArray(); } @Override public <T> T[] toArray(T[] a) { return this.blockingQueue.toArray(a); } @Override public boolean containsAll(Collection<?> c) { return this.blockingQueue.containsAll(c); } @Override public boolean addAll(Collection<? extends E> c) { return this.blockingQueue.addAll(c); } @Override public boolean removeAll(Collection<?> c) { return this.blockingQueue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return this.blockingQueue.retainAll(c); } @Override public void clear() { this.blockingQueue.clear(); } @Override public boolean add(E e) { return this.blockingQueue.add(e); } @Override public void put(E e) throws InterruptedException { this.blockingQueue.put(e); } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return this.blockingQueue.offer(e, timeout, unit); } @Override public E take() throws InterruptedException { return this.blockingQueue.take(); } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { return this.blockingQueue.poll(timeout, unit); } @Override public int remainingCapacity() { return this.blockingQueue.remainingCapacity(); } @Override public boolean remove(Object o) { return this.blockingQueue.remove(o); } @Override public boolean contains(Object o) { return this.blockingQueue.contains(o); } @Override public int drainTo(Collection<? super E> c) { return this.blockingQueue.drainTo(c); } @Override public int drainTo(Collection<? super E> c, int maxElements) { return this.blockingQueue.drainTo(c, maxElements); } } 

RejectedExecutionHandlerImpl

 package com.gunjan; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r); if (!inserted) { throw new RejectedExecutionException(); } } } 

CustomThreadPoolExecutorTest

 package com.gunjan; import java.util.concurrent.*; public class CustomThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500); CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS, customLinkedBlockingQueue, new RejectedExecutionHandlerImpl()); for (int i = 0; i < 750; i++) { try { threadPoolExecutor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(threadPoolExecutor); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (RejectedExecutionException e) { e.printStackTrace(); } } threadPoolExecutor.shutdown(); threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES); System.out.println(threadPoolExecutor); } } 
0
source

Source: https://habr.com/ru/post/1497083/


All Articles