How to block a queue in ForkJoinPool?

I need to block threads in ForkJoinPool when its queue is full. This can be done in the standard ThreadPoolExecutor, for example:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 

I know that there is some Dequeue inside ForkJoinPool, but I do not have access to it through its API.

Update: See answer below.

+3
source share
1 answer

After some research, I will be happy to answer the question:

Reason: There is no such option in the implementation of ForkJoinPool for the following reason. Most juc Artists assume one simultaneous queue and many threads. This leads to a conflict in the queue and degrades the performance of reading / writing to the queue by multiple threads. Thus, this approach is not scalable enough → High competition in the queue can generate a large number of context switches and CPU business.

Implementation: In ForkJoinPool, each thread has a separate double queue ( Deque ), supported by the array. To minimize competition, Job disruption occurs at the tail of deque, while task submission occurs at the head of the current thread (employee). The tail contains the largest part of the work. In other words, tail stealing by another work thread minimizes the number of times to interact with other workers - less competition, better overall productivity.

Ideas for work: There are global application queues. Views from threads other than FJ are in the send queue (workers perform these tasks). Work queues are also mentioned.

The maximum size for queues is limited by the number:

  /** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 

When the queue is full, an unchecked exception is thrown:

 RejectedExecutionException("Queue capacity exceeded") 

This is described in javadocs.

(Also see ThreadPool constructor for UncaughtExceptionHandler )

I am inclined to assert that in the current implementation there is no such mechanism, and this should be implemented by us in the API.

For example, this can be done as follows:

  • Introduce exponential return logic, which attempts to periodically re-execute tasks, increasing the time interval of the next retry. Or..
  • Write a throttle that periodically checks the size of the Queue view (see ForkJoinPool.getQueuedSubmissionCount() ).

Here's the official JSR-166E java code ForkJoinPool for more information.

+8
source

Source: https://habr.com/ru/post/1243718/


All Articles