Perform Celery Tasks with a Dependency Schedule

I would like to have Celery tasks that depend on the result of two or more other tasks. I looked at Python + Celery: task chains? and http://pypi.python.org/pypi/celery-tasktree , but they are only good if tasks have only one dependent task.

I know about TaskSet, but there seems to be no way to instantly execute a callback when TaskSetResult.ready () becomes True. What I mean right now is a periodic task that checks TaskSetResult.ready () every few milliseconds or so and launches a callback when it returns True, but that sounds pretty illogical to me.

Any suggestions?

+6
source share
3 answers

mrbox is true, you can try again until the results are ready, but it’s not so clear in the documents that when you try again, you need to pass the setid and subtask elements, and you need to use the map function to restore, below is an example code for explanations of what I mean.

def run(self, setid=None, subtasks=None, **kwargs): if not setid or not subtasks: #Is the first time that I launch this task, I'm going to launch the subtasks … tasks = [] for slice in slices: tasks.append(uploadTrackSlice.subtask((slice,folder_name))) job = TaskSet(tasks=tasks) task_set_result = job.apply_async() setid = task_set_result.taskset_id subtasks = [result.task_id for result in task_set_result.subtasks] self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) #Is a retry than we just have to check the results tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) if not tasks_result.ready(): self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) else: if tasks_result.successful(): return tasks_result.join() else: raise Exception("Some of the tasks was failing") 
+2
source

In the latest versions of Celery (3.0+), you can use the so-called chord to achieve the desired effect:

From http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives :

Simple chord

The chord primitive allows us to add a callback that will be called when all tasks in the group have completed execution, which is often required for algorithms that are not embarrassingly parallel:

  >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90 

Disclaimer: I have not tried this myself yet.

+7
source

IMHO, you can do something similar to what is done in the docs- link

Or you can use the replay method with max_retries = None - if one of the "basic" .ready () tasks is false, you can run the .retry () method before completing both "basic" tasks.

+2
source

Source: https://habr.com/ru/post/892010/


All Articles