How to fail a chain if its subtask gives an exception

I ran into a rather strange problem with celery:

There is a chain of tasks, and one of them gives an exception and performs several attempts

chain = (err.si(1) | err.si(2)) result = chain.apply_async() result.state result.get() 

here is the task code:

 @celery.task(base=MyTask) def err(x): try: if x < 3: raise Exception else: return x+1 except Exception as exp: print "retrying" raise err.retry(args=[x],exc=exp,countdown=5,max_retries=3) 

The fact is that although the task in the chain gives an exception, but result.state continues to be "PENDING", and .get () just hangs.

I tried to fail the task if it reaches the maximum value of repetitions:

 class MyTask(celery.Task): abstract = True def after_return(self, status, retval, task_id, args, kwargs, einfo): if self.max_retries == self.request.retries: self.state = states.FAILURE 

But although a separately executed task becomes marked as FAILED, execution in the chain gives the same result - PENDING and Freezed get.

I expected the chain to fail as soon as any of its tasks ceased to work, and .get from the result should create an exception thrown from the task.

_ UPDATE _ The stack trace given by apply_async with ALWAYS_EAGER = True

 result = chain.apply_async() Exception Traceback (most recent call last) <ipython-input-4-81202b369b5f> in <module>() ----> 1 result = chain.apply_async() lib/python2.7/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs, **options) 147 # For callbacks: extra args are prepended to the stored args. 148 args, kwargs, options = self._merge(args, kwargs, options) --> 149 return self.type.apply_async(args, kwargs, **options) 150 151 def append_to_list_option(self, key, value): /lib/python2.7/site-packages/celery/app/builtins.pyc in apply_async(self, args, kwargs, group_id, chord, task_id, **options) 232 task_id=None, **options): 233 if self.app.conf.CELERY_ALWAYS_EAGER: --> 234 return self.apply(args, kwargs, **options) 235 options.pop('publisher', None) 236 tasks, results = self.prepare_steps(args, kwargs['tasks']) lib/python2.7/site-packages/celery/app/builtins.pyc in apply(self, args, kwargs, subtask, **options) 249 last, fargs = None, args # fargs passed to first task only 250 for task in kwargs['tasks']: --> 251 res = subtask(task).clone(fargs).apply(last and (last.get(), )) 252 res.parent, last, fargs = last, res, None 253 return last lib/python2.7/site-packages/celery/result.pyc in get(self, timeout, propagate, **kwargs) 677 elif self.state in states.PROPAGATE_STATES: 678 if propagate: --> 679 raise self.result 680 return self.result 681 wait = get Exception: 
+4
source share
2 answers

When you have a chain:

 >>> c = as() | bs() | cs() >>> res = c() >>> res.get() 

A call to the chain will generate a unique identifier for the entire task in the chain, send messages and return the last result in the chain.

So, when you do res.get() , you are just trying to get the result of the last task in the chain.

It will also decorate the results with parent attributes, which you can go through to get the chain move:

 >>> res # result of cs() >>> res.parent # result of bs() >>> res.parent.parent # result of as() 

If you want to check for errors along a path you can do:

 def nodes(node): while node.parent: yield node node = node.parent yield node values = [node.get(timeout=1) for node in reversed(list(nodes(res)))] value = values[-1] 
+11
source

In fact, I think you should not use raise here.

You throw an exception when the documentation says you shouldn't , you can just use err.retry and not raise err.retry .

0
source

Source: https://habr.com/ru/post/1437029/


All Articles