Multiple Restricted Thread Pool

I need a thread pool that provides a maximum of X threads for processing tasks, still not a problem. However, each Task submitted may indicate an IO-Target that is definitely limited (say Y).

So, the presented IOTask returns the target "google.com" with a limit of 4 (Y), and the pool has a global limit of 16 (X). I want to submit 10 google.com tasks, where only 4 are processed in parallel, and the pool has 12 threads for other tasks.

How can i achieve this?

+4
source share
6 answers

You can wrap two instances of ExecutorService in a custom class and manually control the presentation of tasks as follows:

class ExecutorWrapper { private ExecutorService ioExec = Executors.newFixedThreadPool(4); private ExecutorService genExec = Executors.newFixedThreadPool(12); public Future<?> submit(final IOTask task) { return ioExec.submit(task); } public Future<?> submit(final Runnable task) { return genExec.submit(task); } } interface IOTask extends Runnable {} 

This allows you to use 4 threads to perform I / O operations and leave the remaining 12 threads to serve other tasks.

+2
source

The implementation of this function is not easy, since you will need to have separate queues for one target (so that the waiting code becomes much more complicated) or one queue from which you then miss goals that are in capacity (which leads to performance overheads). You can try to extend the ExecutorService to achieve this, but the extension seems non-trivial.

Updated answer / solution:

Thinking about this a bit more, the simplest solution to the blocking problem is to have both a blocking queue (as usual) and a queue map (one queue per target, as well as the number of available threads per target). The queue map is used only for tasks that have been handed over for execution (due to too many threads already running for this purpose) after the task has been retrieved from the normal locking queue.

So, the execution flow will look like this:

  • Task
  • sent (for a specific purpose), calling the code.
  • The task is placed in the lock queue (probably completed here by your own task class, which includes the target information).
  • a thread (from a thread pool) waits in the blocking queue (via take ()).
  • the stream accepts the task.
  • the stream is synchronized when blocked.
  • the thread checks the available quantity for this purpose.
  • if the available value of count> 0

    • then the thread decreases by 1, releases the lock, starts the task.
    • otherwise, the thread places the task in the target map in the task queue (this map is transferred from the task map ), releases the lock and returns to waiting in the lock queue.
  • when the thread completes the task:

    • synchronized when locked.
    • checks the counter for the just completed goal.
    • if count == 0
      • then check the completed task map for any tasks for this purpose, if it exists, then release the lock and start it.
    • If the counter was not 0 or there is no task for the same goal on the transferred map / queue, then increase the available counter (for this purpose), release the lock and return to waiting in the lock queue.

This solution avoids significant performance overhead or has a separate thread for queue management.

+3
source

Think over some answers in a more specific way.

  • You will need your own BlockingQueue, which can separate different types of tasks and return the required Runnable depending on the internal counter.

  • Extend ThreadPoolExecutor and implement beforeExecute and afterExecute.

When beforeExecute is called, it will increment the counter in the queue if Runnable is of type X. When afterExecute was called, it would decrease that counter.

In your queue, you will then return the appropriate Runnable depending on the value of the counter, I believe the take method is where you do it.

There are some synchronization issues that need to be fully thought through to ensure that the counter never goes 4. Unfortunately, when you are in beforeExecute, it's too late, but you can just know how much of what task is currently running. help you get started.

+2
source

Hmmm ... I'm afraid that exiting the ExecutorService does not allow such fine-grained control. You probably need to extend the ExecutorService class to add this function yourself or use two separate pools of fixed threads: one with a bandwidth of 4, the other with a capacity of 12.

+1
source

The idea for this might be to extend the ExecutorService, and in your class there are two ThreadPools, one with a capacity of 4, and the other with a capacity of 12.

Then implement the methods that you need, and based on the IOTasks provided, you can direct the tasks you want to the pool.

+1
source

Use a counter for shared threads and a HashMap that counts the number of threads that are currently trying to access site X. When you want to start a new thread, call the synchronized method, which checks the wait (wait () inside the while loop) until while the number of streams in the hash map is less than 4, and the total number of streams is less than 16. Then increase both counters and start the stream. When the thread ends, it must call a second synchronized method, which decreases the counters and calls notify ()

+1
source

Source: https://habr.com/ru/post/1394496/


All Articles