Question
I use celery to run task sets that look like this:
- I carry out a package of tasks that can be run in parallel, the number of tasks in this batch varies from tens to several thousand.
I combine the results of these tasks into one answer, and then I do something with this answer - for example, storage in the database, save it in a special result file, etc. Basically, after completing the completed tasks, I need to call a function that has the following signature:
def callback(result_file_name, task_result_list):
Now step 1. is performed in the celery queue, and step 2 is performed outside the celery:
tasks = []
This approach is cumbersome, since I have to stop a single thread until all tasks are completed (which may take a couple of hours).
I would like to somehow move step 2 to celery as well - in particular, I will need to add a callback for the entire set of tasks (as far as I know, it is not supported in Celery) or send a task that runs after all these subtasks.
Does anyone know how to do this? I use it in a django environment, so I can save some state in the database.
To summarize my recent findings
Chords will not do
I use chords straight forward because chords allow me to create callbacks that look like this:
def callback(task_result_list):
There is no obvious way to pass additional parameters to the callback (especially because these callbacks cannot be local functions).
Using a database either
I can store the results using TaskSetMeta
, but this object does not have a status field --- therefore, even if I added a signal to TaskSetMeta, I would have to combine the results of the task, which could have significant overhead.
source share