How do I call back for a set of tasks in celery

Question

I use celery to run task sets that look like this:

  • I carry out a package of tasks that can be run in parallel, the number of tasks in this batch varies from tens to several thousand.
  • I combine the results of these tasks into one answer, and then I do something with this answer - for example, storage in the database, save it in a special result file, etc. Basically, after completing the completed tasks, I need to call a function that has the following signature:

    def callback(result_file_name, task_result_list): #store in file def callback(entity_key, task_result_list): #store in db 

Now step 1. is performed in the celery queue, and step 2 is performed outside the celery:

  tasks = [] # add taksks to tasks list task_group = group() task_group.tasks = tasks result = task_group.apply_async() res = result.join() # Aggregate results # Save results to file, database whatever 

This approach is cumbersome, since I have to stop a single thread until all tasks are completed (which may take a couple of hours).

I would like to somehow move step 2 to celery as well - in particular, I will need to add a callback for the entire set of tasks (as far as I know, it is not supported in Celery) or send a task that runs after all these subtasks.

Does anyone know how to do this? I use it in a django environment, so I can save some state in the database.

To summarize my recent findings

Chords will not do

I use chords straight forward because chords allow me to create callbacks that look like this:

  def callback(task_result_list): #store in file 

There is no obvious way to pass additional parameters to the callback (especially because these callbacks cannot be local functions).

Using a database either

I can store the results using TaskSetMeta , but this object does not have a status field --- therefore, even if I added a signal to TaskSetMeta, I would have to combine the results of the task, which could have significant overhead.

+6
source share
1 answer

The answer was really clear, and I really can use chords --- and additional parameters (for example, the name of the report file, etc.) should be transmitted as kwargs.

Here is the task of the chord:

 @task def print_and_sum(to_sum, file_name): print file_name print sum(to_sum) return file_name, sum(to_sum) 

Here's how to do it:

 subtasks = [...] result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'})) 
+3
source

Source: https://habr.com/ru/post/916751/


All Articles