Celery: callback after task hierarchy

I am using Celery from webapp to run a task hierarchy.

Tasks

I use the following tasks:

  • task_a
  • task_b
  • task_c
  • notify_user

In the Django view, multiple instances of task_a . Each of them does some processing and then starts several instances of task_b . And each of them does some processing, and then runs multiple instances of task_c .

To visualize:

Tree

Goals

My goal is to complete all the tasks and launch the callback function as soon as the entire hierarchy is over. In addition, I want to be able to transfer data from the lowest tasks to the upper level.

  • The view should simply "start" tasks and then return.
  • Each subtask depends on the parent task. The parent task is not directly dependent on the child task. After the parent task has launched all child tasks, it can be stopped.
  • Everything can be parallelized while the parent task starts before the child task starts.
  • After all tasks are completed, you should call the notify_user callback function.
  • The notify_user callback function requires access to data from task_c s.

All tasks must be non-blocking, so task_b should not wait for all task_c subtasks to task_c .

What will be the right way to achieve this goal?

+6
source share
2 answers

The solution turned out to be a dynamic function of the task provided in this pull request: https://github.com/celery/celery/pull/817 . At the same time, each task can return a group of subtasks, which then replaces the original tays in the queue.

+4
source

Suppose you have the following tasks:

 celery = Celery( broker="amqp://test: test@localhost :5672/test" ) celery.conf.update( CELERY_RESULT_BACKEND = "mongodb", ) @celery.task def task_a(result): print 'task_a:', result return result @celery.task def task_b(result): print 'task_b:', result return result @celery.task def task_c(result): print 'task_c:', result return result @celery.task def notify_user(result): print result return result 

For input data (as you drew it):

  tree = [ [["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]] ] 

You can do:

  a_group = [] for ia, a in enumerate(tree): print "A%s:" % ia b_group = [] for ib, b in enumerate(a): print " - B%s:" % ib for c in b: print ' -', c c_group = group([task_c.s(c) for c in b]) b_group.append(c_group | task_b.s()) a_group.append(group(b_group) | task_a.s()) final_task = group(a_group) | notify_user.s() 

This view (don't read, it's ugly :)

 [[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user() 

And the data passed to notify_user will be as follows:

 [[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]] 

Everything is done through callbacks (chords), so there are no tasks waiting to complete other tasks.

+1
source

Source: https://habr.com/ru/post/945785/


All Articles