I am writing an application that will execute a group of several synchronous task chains asynchronously.
In other words, I may have a pipeline foo(a,b,c) -> boo(a,b,c) for some list bs .
My understanding is to chain foo(a,b,c) | boo(a,b,c) foo(a,b,c) | boo(a,b,c) for each b in this list. Then these chains form a group of celery, which can be applied asynchronously.
My code for this is below:
my_app.py
#!/usr/bin/env python3 import functools import time from celery import chain, group, Celery from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') @app.task def foo(a, b, c): logger.info("foo from {0}!".format(b)) return b @app.task def boo(a, b, c): logger.info("boo from {0}!".format(b)) return b def break_up_tasks(tasks): try: first_task, *remaining_tasks = tasks except ValueError as e: first_task, remaining_tasks = [], [] return first_task, remaining_tasks def do_tasks(a, bs, c, opts): tasks = [foo, boo]
Celery Launch
celery worker -A my_app -l info -c 5 -Q default
However, I found that when I run the above, my client server starts an infinite loop because boo missing an argument:
TypeError: boo() missing 1 required positional argument: 'c'
I understand that apply_async will provide args kwarg to each chain and that previous links in the chain will provide their return value for subsequent links.
Why doesn't boo get arguments right? I am sure that these tasks are not well written, as this is my first foray into celery. If you have any other suggestions, I am happy to entertain them.