You are using Django ORM as a broker, which means that the ads are saved only in memory (see the migration mapping table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison ),
So, when you apply this task with routing_key important_task.update
, it will not be able to route it, since it has not yet announced the queue.
It will work if you do this:
@task(queue="i_tasks", routing_key="important_tasks.update") def important_task(): print("IMPORTANT")
But it would be much easier to use the automatic routing function, since there is nothing here that shows that you need to use the exchange of "themes", for automatic routing just delete the settings :
CELERY_DEFAULT_QUEUE
,CELERY_DEFAULT_EXCHANGE
,CELERY_DEFAULT_EXCHANGE_TYPE
CELERY_DEFAULT_ROUTING_KEY
CELERY_QUEUES
And declare your task as follows:
@task(queue="important") def important_task(): return "IMPORTANT"
and then to start the working user from this queue:
$ python manage.py celeryd -l info -Q important
or consume from the default queue ( celery
) and the important
queue:
$ python manage.py celeryd -l info -Q celery,important
Another good practice is to not hard-code the queue names in and use CELERY_ROUTES
:
@task def important_task(): return "DEFAULT"
then in your settings:
CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}
If you still insist on using topic exchange, you can add this router to automatically announce all the queues the first time a job is sent:
class PredeclareRouter(object): setup = False def route_for_task(self, *args, **kwargs): if self.setup: return self.setup = True from celery import current_app, VERSION as celery_version