The domain of your problem can be modeled into two data structures, which I called deferred (which map types to unlimited task queues - here waiting for tasks waiting to be launched) and running (where there is no more than one task for each type that is ready to be launched or actually executed by the Contractor )
The K constraint must be applied to the launch: it has at most K Type before Task mappings.
The highlight is that the number of threads that you allocate for the entire processing of the task is completely orthogonal to concurrency constraint management: your choice of thread pool should be dictated (among other things) by the type of tasks that need to be performed (IO / CPU?), And not concurrency restrictions.
Implementation:
public class Foo { enum TaskType { A, B, C } class Task { TaskType type; Runnable runnable; volatile boolean running; } Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>(); Map<TaskType, Task> running = new HashMap<TaskType, Task>(); ExecutorService executor = null; // Executor implementation is irrelevant to the problem /** Chooses a task of a random type between those not running. */ TaskType choosePending(){ Set running_types = running.keySet(); running_types.removeAll(Arrays.asList(pending.keySet())); List shuffled = new ArrayList(running_types); Collections.shuffle(shuffled); return (TaskType) shuffled.get(0); } // note that max concurrency != parallelism level (which the executor is responsible for) final int MAX_CONCURRENCY = 3; void produce(){ synchronized(running){ if (running.size() < MAX_CONCURRENCY) { synchronized (pending){ TaskType t = choosePending(); running.put(t, pending.get(t).remove()) ; } } } } { new Thread(new Runnable() { public void run() { while (true) produce(); } }).start(); } Task chooseRunning(){ for (Task t : running.values()){ if (!t.running){ return t; } } return null; } void consume(){ final Task t; synchronized (running){ t = chooseRunning(); if (t != null){ t.running = true; executor.execute(new Runnable() { public void run() { t.runnable.run(); synchronized (running) { running.remove(t); } } }); } } } { new Thread(new Runnable() { public void run() { while (true) consume(); } }).start(); } }
source share