Groups of chains with positional arguments in partial tasks using Celery

I am writing an application that will execute a group of several synchronous task chains asynchronously.

In other words, I may have a pipeline foo(a,b,c) -> boo(a,b,c) for some list bs .

My understanding is to chain foo(a,b,c) | boo(a,b,c) foo(a,b,c) | boo(a,b,c) for each b in this list. Then these chains form a group of celery, which can be applied asynchronously.

My code for this is below:

my_app.py

 #!/usr/bin/env python3 import functools import time from celery import chain, group, Celery from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') @app.task def foo(a, b, c): logger.info("foo from {0}!".format(b)) return b @app.task def boo(a, b, c): logger.info("boo from {0}!".format(b)) return b def break_up_tasks(tasks): try: first_task, *remaining_tasks = tasks except ValueError as e: first_task, remaining_tasks = [], [] return first_task, remaining_tasks def do_tasks(a, bs, c, opts): tasks = [foo, boo] # There should be an option for each task if len(opts) != len(tasks): raise ValueError("There should be {0} provided options".format(len(tasks))) # Create a list of tasks that should be included per the list of options' boolean values tasks = [task for opt, task in zip(opts, tasks) if opt] first_task, remaining_tasks = break_up_tasks(tasks) # If there are no tasks, we're done. if not first_task: return chains = ( functools.reduce( # `a` should be provided by `apply_async` `args` kwarg # `b` should be provided by previous partials in chain lambda x, y: x | ys(c), remaining_tasks, first_task.s(a, b, c) ) for b in bs ) g = group(*chains) res = g.apply_async(args=(a,), queue="default") print("Applied async... waiting for termination.") total_tasks = len(tasks) while not res.ready(): print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks)) time.sleep(1) if __name__ == "__main__": a = "whatever" bs = ["hello", "world"] c = "baz" opts = [ # do "foo" True, # do "boo" True ] do_tasks(a, bs, c, opts) 

Celery Launch

 celery worker -A my_app -l info -c 5 -Q default 

However, I found that when I run the above, my client server starts an infinite loop because boo missing an argument:

TypeError: boo() missing 1 required positional argument: 'c'

I understand that apply_async will provide args kwarg to each chain and that previous links in the chain will provide their return value for subsequent links.

Why doesn't boo get arguments right? I am sure that these tasks are not well written, as this is my first foray into celery. If you have any other suggestions, I am happy to entertain them.

+5
source share
1 answer

After debugging your code (I am also familiar with Celery! :)) I found out that each related function will receive the first argument, replaced by the result of the previous call to the function chain, so with this I believe that the solution to your problem is to add one missing argument (second) to ys in abbreviation:

 chains = ( functools.reduce( # `a` should be provided by `apply_async` `args` kwarg # `b` should be provided by previous partials in chain lambda x, y: x | ys(b,c), # <- here is the 'new guy' remaining_tasks, first_task.s(a, b, c) ) for b in bs ) 

Hope this helps.

+3
source

Source: https://habr.com/ru/post/1261269/


All Articles