Django & Celery - routing issues

I am using Django and Celery and I am trying to configure routing on multiple queues. When I specify the routing_key and exchange task (either in the task decorator, or when apply_async() ), the task is not added to the broker (which is Kombu connecting to my MySQL database).

If I specify the name of the queue in the task decorator (which will mean that the routing key is ignored), the task works fine. This is apparently a problem with configuring routing / exchange.

Any idea what could be the problem?

Here's the setting:

settings.py

 INSTALLED_APPS = ( ... 'kombu.transport.django', 'djcelery', ) BROKER_BACKEND = 'django' CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = "tasks" CELERY_DEFAULT_EXCHANGE_TYPE = "topic" CELERY_DEFAULT_ROUTING_KEY = "task.default" CELERY_QUEUES = { 'default': { 'binding_key':'task.#', }, 'i_tasks': { 'binding_key':'important_task.#', }, } 

tasks.py

 from celery.task import task @task(routing_key='important_task.update') def my_important_task(): try: ... except Exception as exc: my_important_task.retry(exc=exc) 

Task assignment:

 from tasks import my_important_task my_important_task.delay() 
+6
source share
1 answer

You are using Django ORM as a broker, which means that the ads are saved only in memory (see the migration mapping table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison ),

So, when you apply this task with routing_key important_task.update , it will not be able to route it, since it has not yet announced the queue.

It will work if you do this:

 @task(queue="i_tasks", routing_key="important_tasks.update") def important_task(): print("IMPORTANT") 

But it would be much easier to use the automatic routing function, since there is nothing here that shows that you need to use the exchange of "themes", for automatic routing just delete the settings :

  • CELERY_DEFAULT_QUEUE ,
  • CELERY_DEFAULT_EXCHANGE ,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

And declare your task as follows:

 @task(queue="important") def important_task(): return "IMPORTANT" 

and then to start the working user from this queue:

 $ python manage.py celeryd -l info -Q important 

or consume from the default queue ( celery ) and the important queue:

 $ python manage.py celeryd -l info -Q celery,important 

Another good practice is to not hard-code the queue names in and use CELERY_ROUTES :

 @task def important_task(): return "DEFAULT" 

then in your settings:

 CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

If you still insist on using topic exchange, you can add this router to automatically announce all the queues the first time a job is sent:

 class PredeclareRouter(object): setup = False def route_for_task(self, *args, **kwargs): if self.setup: return self.setup = True from celery import current_app, VERSION as celery_version # will not connect anywhere when using the Django transport # because declarations happen in memory. with current_app.broker_connection() as conn: queues = current_app.amqp.queues channel = conn.default_channel if celery_version >= (2, 6): for queue in queues.itervalues(): queue(channel).declare() else: from kombu.common import entry_to_queue for name, opts in queues.iteritems(): entry_to_queue(name, **opts)(channel).declare() CELERY_ROUTES = (PredeclareRouter(), ) 
+43
source

Source: https://habr.com/ru/post/916381/


All Articles