ThreadPoolExecutor and Queue

I thought that using ThreadPoolExecutor , we can send Runnable to execute either in the BlockingQueue passed in the constructor, or using the execute method.
I also realized that if the task is available, it will be completed.
I do not understand the following:

 public class MyThreadPoolExecutor { private static ThreadPoolExecutor executor; public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){ executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue); //executor.prestartAllCoreThreads(); } public static void main(String[] main){ BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); final String[] names = {"A","B","C","D","E","F"}; for(int i = 0; i < names.length; i++){ final int j = i; q.add(new Runnable() { @Override public void run() { System.out.println("Hi "+ names[j]); } }); } new MyThreadPoolExecutor(10, 20, 1, q); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } /*executor.execute(new Runnable() { @Override public void run() { System.out.println("++++++++++++++"); } }); */ for(int i = 0; i < 100; i++){ final int j = i; q.add(new Runnable() { @Override public void run() { System.out.println("Hi "+ j); } }); } } } 

This code does nothing unless I uncomment executor.prestartAllCoreThreads(); in the OR constructor, I call execute from runnable, which prints System.out.println("++++++++++++++"); (he is also commented).

Why?
Quote (my emphasis):

By default, even the main threads are created at first, and only starts when new tasks arrive , but this can be redefined dynamically using the prestartCoreThread () or prestartAllCoreThreads () method. You probably want to create threads if you create a pool with a non-empty queue.

Ok Therefore, my turn is not empty. But I create an executor , I do sleep , and then add a new Runnable to the queue (in a loop of up to 100).
This loop does not count as new tasks arrive ?
Why doesn't this work, and should I either prestart or explicitly call execute ?

+6
source share
2 answers

Workflows are generated when tasks are performed by execution, and these are those that interact with the main work queue. You need to pre-train workers if you start with a non-empty work queue. See Implementation in OpenJDK 7 .

I repeat, workers are those who interact with the work queue . They are generated only on demand when passed through execute . (or above the layers above it, e.g. invokeAll , submit , etc.). If they are not running, it does not matter how much work you add to the queue, since there is nothing checking it as workers did not start .

ThreadPoolExecutor does not create workflows until it is necessary, or if you first remove their creation using the prestartAllCoreThreads and prestartCoreThread methods . If there are no workers, then there is no work in your line.

The reason for adding initial execute jobs is because it forces the main workflow to be created, the only one that can then start processing work from your queue. You can also call prestartCoreThread and get similar behavior. If you want to run all the workers, you must call prestartAllCoreThreads or send this number of tasks via execute .

The following is the execute code.

 /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } 
+9
source

A BlockingQueue is not a magic thread manager. If you send Runnable objects to the queue, and there are no current threads to perform these tasks, they certainly will not be executed. The execute method, on the other hand, will automatically send threads according to the configuration of the thread pool, if necessary. If you first run all the main threads, there will be threads to consume jobs from the queue.

+5
source

Source: https://habr.com/ru/post/922178/


All Articles