Ensuring thread execution order in threadpool

I read about the thread pool template and I cannot find the usual solution for the following problem.

I sometimes want tasks to be performed sequentially. For example, I am reading fragments of text from a file and for some reason I need chunks that will be processed in this order. So basically I want to eliminate concurrency for some tasks .

Consider this scenario in which tasks with * must be processed in the order in which they were entered. Other tasks can be processed in any order.

 push task1 push task2 push task3 * push task4 * push task5 push task6 * .... and so on 

In the context of the thread pool, without this limitation, one queue of pending tasks works fine, but obviously not here.

I thought that some threads work in a queue with a specific thread, while others work in a "global" queue. Then, in order to sequentially perform some of the tasks, I just need to push them into the queue where one thread looks. That sounds a little awkward.

So, the real question in this long story is: how would you solve this? How would you guarantee the completion of these tasks ?

EDIT

As a more general problem, suppose the above scenario becomes

 push task1 push task2 ** push task3 * push task4 * push task5 push task6 * push task7 ** push task8 * push task9 .... and so on 

I mean, tasks within a group must be performed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 , for example.

Another thing to note is that I don’t have access to all the tasks in the upfront group (and I can’t wait until all of them appear before the group starts).

Thank you for your time.

+44
multithreading design-patterns concurrency threadpool
Aug 25 2018-11-11T00:
source share
16 answers

Something like the following will allow queuing up parallel and parallel tasks, where sequential tasks will be performed one after another, and parallel tasks will be performed in any order, but in parallel. This gives you the opportunity to serialize tasks where necessary, also have parallel tasks, but do it as you receive tasks, that is, you do not need to know about the entire sequence up, the execution order is supported dynamically.

 internal class TaskQueue { private readonly object _syncObj = new object(); private readonly Queue<QTask> _tasks = new Queue<QTask>(); private int _runningTaskCount; public void Queue(bool isParallel, Action task) { lock (_syncObj) { _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task }); } ProcessTaskQueue(); } public int Count { get{lock (_syncObj){return _tasks.Count;}} } private void ProcessTaskQueue() { lock (_syncObj) { if (_runningTaskCount != 0) return; while (_tasks.Count > 0 && _tasks.Peek().IsParallel) { QTask parallelTask = _tasks.Dequeue(); QueueUserWorkItem(parallelTask); } if (_tasks.Count > 0 && _runningTaskCount == 0) { QTask serialTask = _tasks.Dequeue(); QueueUserWorkItem(serialTask); } } } private void QueueUserWorkItem(QTask qTask) { Action completionTask = () => { qTask.Task(); OnTaskCompleted(); }; _runningTaskCount++; ThreadPool.QueueUserWorkItem(_ => completionTask()); } private void OnTaskCompleted() { lock (_syncObj) { if (--_runningTaskCount == 0) { ProcessTaskQueue(); } } } private class QTask { public Action Task { get; set; } public bool IsParallel { get; set; } } } 

Update

To process task groups with sequences of sequential and parallel tasks, GroupedTaskQueue can control the TaskQueue for each group. Again, you don’t need to know about groups up, they are all dynamically managed as you receive assignments.

 internal class GroupedTaskQueue { private readonly object _syncObj = new object(); private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>(); private readonly string _defaultGroup = Guid.NewGuid().ToString(); public void Queue(bool isParallel, Action task) { Queue(_defaultGroup, isParallel, task); } public void Queue(string group, bool isParallel, Action task) { TaskQueue queue; lock (_syncObj) { if (!_queues.TryGetValue(group, out queue)) { queue = new TaskQueue(); _queues.Add(group, queue); } } Action completionTask = () => { task(); OnTaskCompleted(group, queue); }; queue.Queue(isParallel, completionTask); } private void OnTaskCompleted(string group, TaskQueue queue) { lock (_syncObj) { if (queue.Count == 0) { _queues.Remove(group); } } } } 
+17
Sep 01 '11 at 13:41
source share

Thread pools are good for cases where the relative order of tasks does not matter, provided that they are all completed. In particular, it should be good that they all run in parallel.

If your tasks should be performed in a specific order, then they are not suitable for parallelism, so the thread pool is not suitable.

If you want to transfer these serial tasks from the main thread, then for these tasks you will need one background thread with a task queue. You can continue to use the thread pool for other tasks that are suitable for parallelism.

Yes, this means that you need to decide where to send the task, depending on whether the task is in order or “can be parallelized”, but it does not really matter.

If you have groups that need to be serialized, but which can run in parallel with other tasks, then you have several options:

  • Create a separate task for each group that performs the corresponding group tasks, and submit this task to the thread pool.
  • Each task in the group clearly waits for the previous task in the group and sends it to the thread pool. This requires that your thread pool be able to handle the case when a thread is waiting for an unplanned task without blocking.
  • Have a dedicated thread for each group and send group tasks to the appropriate message queue.
+14
Aug 25 '11 at 16:11
source share

To do what you want to do with threadpool, you may need to create some kind of scheduler.

Something like that:

TaskQueue -> Scheduler -> Queue -> ThreadPool

The scheduler works in its thread, keeping traces of dependencies between jobs. When the work is ready to be completed, the scheduler simply pushes it into the queue for the thread.

ThreadPool may need to send signals to the Scheduler to indicate when the task is completed so that the scheduler can queue the tasks depending on this task.

In your case, the dependencies can probably be stored in a linked list.

Let's say you have the following dependencies: 3 → 4 → 6 → 8

Task 3 works in threadpool, you still have no idea that work 8 exists.

Task 3 ends. You remove 3 from the linked list, you put task 4 in the queue in threadpool.

Arrival 8. You put it at the end of the linked list.

The only constructs that need to be fully synchronized are the queues before and after the scheduler.

+6
Aug 30 '11 at 20:38
source share

In principle, there are a number of pending tasks. Some of the tasks can only be performed when one or more other pending tasks are completed.

Deferred tasks can be modeled in a dependency graph:

  • “task 1 → task2” means that task 2 can only be completed after task 1 is completed. ”arrows indicate the direction of execution.
  • the uncertainty of the task (the number of tasks pointing to it) determines whether the task is ready for execution. If indegree is 0, it can be executed.
  • sometimes the task must wait for the completion of several tasks, then the indegree value will be> 1.
  • If the task does not need to wait for the completion of other tasks (its value is zero), it can be sent to the thread pool with work flows or in the queue with tasks waiting for the working thread to pick them up. You know that the task at hand will not lead to a deadlock, because the task does not expect anything. As an optimization, you can use the priority queue, for example. in which tasks will be performed, of which tasks depend more on dependency. This also cannot cause a deadlock, since all tasks in the thread pool can be performed. However, this can trigger hunger.
  • If the task completes execution, it can be removed from the dependency graph, possibly reducing the independence of other tasks, which, in turn, can be sent to the workflow pool.

Thus, there is (at least) one thread used to add / remove pending tasks, and there is a pool of worker thread threads.

When a task is added to the dependency graph, you should check:

  • how is the task related to the dependency graph: what tasks do you need to wait until the end and what tasks should wait for completion? Draw the connections from and to the new task, respectively.
  • after the connections are drawn: have new connections appeared in the dependency loop? If so, a deadlock situation occurs.

Performance

  • this pattern is slower than sequential execution, if parallel execution is actually rarely possible, because you need additional administration to do everything almost sequentially anyway.
  • this template runs quickly if many tasks can be performed simultaneously in practice.

Assumptions

As you may have read between the lines, you should design tasks so that they do not interfere with other tasks. In addition, there must be a way to prioritize tasks. The priority of the task should include the data processed by each task. Two tasks cannot simultaneously change the same object; one of the tasks must take precedence over the other, or the operations performed on the object must be thread safe.

+5
Sep 01 '11 at 20:21
source share

If I understand the problem correctly, jdk artists do not have this feature, but it is easy to minimize. You basically need

  • pool of workflows, each of which has a dedicated queue
  • some abstraction on the queues to which you are proposing a job (cf the ExecutorService )
  • some algorithm that deterministically selects a particular queue for each part of the work.
  • each part of the work then receives offers in the right queue and therefore is processed in the correct order

The difference with jdk executors is that they have 1 queue with n threads, but you want n queues and m threads (where n may or may not equal m)

* edit after reading that each task has a key *

A bit more

  • write code that converts the key to an index (int) in a given range (0-n, where n is the number of threads required), it can be as simple as key.hashCode() % n , or it can be some static mapping known key values ​​into streams or whatever you want
  • at startup
    • create n queues, put them in an indexed structure (array, list all)
    • start n threads, each thread just locks from the queue
    • when he gets some kind of work, he knows how to do work specific to this task / event (obviously, you can have some mapping of tasks to actions if you have heterogeneous events)
  • keep it behind some facade that accepts work items
  • When the task arrives, transfer it to the facade
    • the facade finds the correct queue for the task based on the key, offers it to this queue

it’s just enough to add an automatic restart of workflows to this scheme, you just need to register the workflow with some manager to indicate “I own this queue”, and then some household around this + error detection in the thread (which means that he does not register ownership of this queue, returning the queue to the free queue pool, which is the trigger for starting a new thread).

+4
Aug 25 '11 at 17:10
source share

Option 1 - Integrated

Since you have consecutive tasks, you can put these tasks in a chain and let the tasks themselves resend the thread pool after they are completed. Suppose we have a list of tasks:

  [Task1, ..., Task6] 

as in your example. We have a sequential dependency, such that [Task3, Task4, Task6] is a chain of dependencies. Now we do the work (Erlang pseudo-code):

  Task4Job = fun() -> Task4(), % Exec the Task4 job push_job(Task6Job) end. Task3Job = fun() -> Task3(), % Execute the Task3 Job push_job(Task4Job) end. push_job(Task3Job). 

That is, we change the task Task3 , transferring it to the task, which as a continuation pushes the next task in the queue to the thread pool. There are strong similarities with the general style of passing continuation, which is also found in systems like Node.js or Pythons Twisted .

Summarizing, you create a system in which you can define task chains that can defer continue working and resend further work.

Option 2 - Simple

Why are we even worried about job sharing? I mean, since they are sequentially dependent, executing all of them in the same thread will not be faster or slower than taking this chain and distributing it across multiple threads. Assuming a "sufficient" workload, any thread will always work anyway, so just linking tasks together is probably the easiest way:

  Task = fun() -> Task3(), Task4(), Task6() % Just build a new job, executing them in the order desired end, push_job(Task). 

It is very easy to do such things if you have the functions of first-class citizens so that you can create them in your own language as you wish, for example, any programming language, Python, Ruby-blocks - etc.

I do not really like the idea of ​​creating a queue or continuation stack, as in "Option 1", although I would definitely go with the second option. At Erlang, we even have programs called jobs written by Erlang Solutions and released as Open Source. jobs designed to run and load, governing jobs like these. I would probably combine option 2 with assignments if I solve this problem.

+3
Aug 26 '11 at 3:40 a.m.
source share

Answers suggesting not to use a thread pool are like hard coding knowledge about task dependencies / execution order. Instead, I would create a CompositeTask that manages the start and end dependencies between two tasks. Encapsulating the dependency behind the task interface, all tasks can be processed evenly and added to the pool. This hides the execution details and allows you to change the dependencies of the task without affecting the use of the thread pool.

The question does not indicate the language - I will use Java, which, I hope, is accessible to most.

 class CompositeTask implements Task { Task firstTask; Task secondTask; public void run() { firstTask.run(); secondTask.run(); } } 

Performs tasks sequentially and in a single thread. You can link multiple CompositeTask together to create a sequence of several sequential tasks as needed.

The disadvantage here is that it connects the stream throughout all sequentially executed tasks. You may have other tasks that you would prefer to complete between the first and second tasks. Thus, instead of performing the second task directly, complete the composite task of the schedule for the second task:

 class CompositeTask implements Runnable { Task firstTask; Task secondTask; ExecutorService executor; public void run() { firstTask.run(); executor.submit(secondTask); } } 

This ensures that the second task will not be completed until the completion of the first task, and also allows the pool to perform other (possibly more urgent) tasks. Please note that the first and second tasks can be performed in separate threads, therefore, although they are not executed simultaneously, any common data used by the tasks must be visible to other threads (for example, by creating volatile variables.)

This is a simple but powerful and flexible approach and allows the tasks themselves to determine the execution restrictions, rather than doing it using different thread pools.

+3
Aug 30 2018-11-11T00:
source share

Use two Active Objects . In a nutshell: the active object template consists of a priority queue and 1 or many workflows that can receive tasks from the queue and process it.

So, use one active object with one workflow: all tasks that would be places in the queue will be processed sequentially. Use the second active object with the number of workflows greater than 1. In this case, workflows will receive and process tasks from the queue in any order.

Luck.

+3
Sep 01 '11 at 8:50
source share

I think the thread pool can be effectively used in this situation. The idea is to use a separate strand object for each group of dependent tasks. You add tasks in turn with or without a strand object. You are using the same strand object with dependent tasks. Your scheduler checks to see if the next task has a strand , and if that strand locked. If not, block this strand and run this task. If strand already locked, queue this task until the next scheduling event. When the task is complete, unlock its strand .

As a result, you need a single queue, you do not need any additional threads, not complex groups, etc. strand object can be very simple with the two lock and unlock methods.

I often encounter the same design problem, for example. for an asynchronous network server that handles multiple concurrent sessions. Sessions are independent (this maps them to your independent tasks and groups of dependent tasks) when tasks within the sessions are dependent (this maps internal tasks of the session to your dependent tasks within the group). Using the described approach, I completely exclude explicit synchronization within the session. Each session has its own strand object.

And what's more, I use the existing (large) implementation of this idea: Boost Asio library (C ++). I just used the term strand . The implementation is elegant: I transfer my async tasks to the appropriate strand object before scheduling them.

+3
Sep 02 '11 13:28
source share

, . Threadpool , , , .

, threadpool . , . , , , .

: , , :)

+2
Aug 25 '11 15:46
source share

?

 push task1 push task2 push task346 push task5 

:

 push task1 push task27 ** push task3468 * push task5 push task9 
+1
25 . '11 15:33
source share

, , . , , - , . Java API: ExecutorCompletionService Callable

:

 public interface MyAsyncTask extends Callable<MyAsyncTask> { // tells if I am a normal or dependent task private boolean isDependent; public MyAsyncTask call() { // do your job here. return this; } } 

CompletionService (.. ):

 ExecutorCompletionService<MyAsyncTask> completionExecutor = new ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5)); Future<MyAsyncTask> dependentFutureTask = null; for (MyAsyncTask task : tasks) { if (task.isNormal()) { // if it is a normal task, submit it immediately. completionExecutor.submit(task); } else { if (dependentFutureTask == null) { // submit the first dependent task, get a reference // of this dependent task for later use. dependentFutureTask = completionExecutor.submit(task); } else { // wait for last one completed, before submit a new one. dependentFutureTask.get(); dependentFutureTask = completionExecutor.submit(task); } } } 

, ( 5), , , , ( get() Future ), ( ), .

, ExecutorCompletionService, FutureTask Semaphore, .

+1
31 . '11 9:22
source share

. . , . ThreadPoolExecutor . 1, , , . , . . , .

+1
01 . '11 12:36
source share

, , . , : 2, 7 3, 4 .. 4- > 6 6- > 8.

1,2,5,9... .

, , . .

+1
01 . '11 18:28
source share

java-, dexecutor ( : )

 DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor(); executor.addDependency("task1", "task2"); executor.addDependency("task4", "task6"); executor.addDependency("task6", "task8"); executor.addIndependent("task3"); executor.addIndependent("task5"); executor.addIndependent("task7"); executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING); 

task1, task3, task5, task7 ( ), task1 , task2 , task2 4, task4 task6, , , task6 8.

0
25 . '16 14:50
source share

, , , . ?

"" , , , ; Task.ContinueWith().

 public class PoolsTasks { private readonly object syncLock = new object(); private Task serialTask = Task.CompletedTask; private bool isSerialTask(Action task) { // However you determine what is serial ... return true; } public void RunMyTask(Action myTask) { if (isSerialTask(myTask)) { lock (syncLock) serialTask = serialTask.ContinueWith(_ => myTask()); } else Task.Run(myTask); } } 
0
20 . '17 15:08
source share



All Articles