Restricted concurrency processing in Java

Suppose I need to process requests of three types: A, B, and C as follows:

  • Requests are processed simultaneously.
  • During the same time, no more than K (<= 3) requests are processed simultaneously.
  • Requests of the same type cannot be processed at the same time.

In the general case, the number of types is N, and the number of simultaneous requests is K <= N.

How to implement it in Java using java.util.concurrent ?

+6
source share
4 answers

You cannot process K requests at the same time, which violates the second rule. The maximum number of simultaneous requests is the number types. In your case, there are three. So, do three queues and attach them to three threads. This is the only way. Executors.newSingleThreadExecutor implements this technique.

 public static void main(String[] args) { int N = 2; int K = 3; List<Executor> executors = new ArrayList<Executor>(N); for(int i = 0; i < N; i++){ executors.add(Executors.newSingleThreadExecutor()); } Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K); int i = 0; for(Type t : Type.values()){ typeExecutors.put(t, executors.get(i++ % executors.size())); } } enum Type{ T1, T2, T3 } 
+2
source

I would create three Executors.newFixedThreadPool(1)

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)

and with Factory delegate each performance to any of the performers.

 ExecutorService executor = ThreadFactory.getExecutorForRequest(RequestType type); executor.execute(request); 
0
source

Requests are processed simultaneously.

You can use the Executor service.

No more than K requests are processed simultaneously.

In the artist, set the maximum number of threads.

Requests of the same type cannot be processed at the same time.

You may think that for each type of request there are different locks. Just make sure that if the thread cannot obtain a lock for the request within the assigned time, it must give in and proceed to the next task processing.

0
source

The domain of your problem can be modeled into two data structures, which I called deferred (which map types to unlimited task queues - here waiting for tasks waiting to be launched) and running (where there is no more than one task for each type that is ready to be launched or actually executed by the Contractor )

The K constraint must be applied to the launch: it has at most K Type before Task mappings.

The highlight is that the number of threads that you allocate for the entire processing of the task is completely orthogonal to concurrency constraint management: your choice of thread pool should be dictated (among other things) by the type of tasks that need to be performed (IO / CPU?), And not concurrency restrictions.

Implementation:

 public class Foo { enum TaskType { A, B, C } class Task { TaskType type; Runnable runnable; volatile boolean running; } Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>(); Map<TaskType, Task> running = new HashMap<TaskType, Task>(); ExecutorService executor = null; // Executor implementation is irrelevant to the problem /** Chooses a task of a random type between those not running. */ TaskType choosePending(){ Set running_types = running.keySet(); running_types.removeAll(Arrays.asList(pending.keySet())); List shuffled = new ArrayList(running_types); Collections.shuffle(shuffled); return (TaskType) shuffled.get(0); } // note that max concurrency != parallelism level (which the executor is responsible for) final int MAX_CONCURRENCY = 3; void produce(){ synchronized(running){ if (running.size() < MAX_CONCURRENCY) { synchronized (pending){ TaskType t = choosePending(); running.put(t, pending.get(t).remove()) ; } } } } { new Thread(new Runnable() { public void run() { while (true) produce(); } }).start(); } Task chooseRunning(){ for (Task t : running.values()){ if (!t.running){ return t; } } return null; } void consume(){ final Task t; synchronized (running){ t = chooseRunning(); if (t != null){ t.running = true; executor.execute(new Runnable() { public void run() { t.runnable.run(); synchronized (running) { running.remove(t); } } }); } } } { new Thread(new Runnable() { public void run() { while (true) consume(); } }).start(); } } 
0
source

Source: https://habr.com/ru/post/955749/


All Articles