Block Queue for Selery / RabbitMQ Blocked Messages?

I called a task that retrieves some information remotely from urllib2 several thousand times. Tasks are planned with a random stage (during the week), so they all do not get to the server at the same time. Sometimes I get 404, sometimes not. I handle the error in case this happens.

In the RabbitMQ console, I see 16 unconfirmed messages: 16 unacknowledged messages

I stopped the celery, cleared the lineup and restarted it. 16 unconfirmed messages remained.

I have other tasks that go in one queue, and not one of them has been completed. After cleaning, I tried to send another task, and it remains ready:

enter image description here

Any ideas how I can find out why messages remain unconfirmed?

Versions:

celery==3.1.4 {rabbit,"RabbitMQ","3.5.3"} 

celeryapp.py

 CELERYBEAT_SCHEDULE = { 'social_grabber': { 'task': '<django app>.tasks.task_social_grabber', 'schedule': crontab(hour=5, minute=0, day_of_week='sunday'), }, } 

tasks.py

 @app.task def task_social_grabber(): for user in users: eta = randint(0, 60 * 60 * 24 * 7) #week in seconds task_social_grabber_single.apply_async((user), countdown=eta) 

There is no routing for this task, so it goes into the default queue: celery. One worker processes this queue.

supervisord.conf:

 [program:celery] autostart = true autorestart = true command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery 
+6
source share
2 answers

RabbitMQ violated QoS settings in version 3.3. You need to upgrade celery to at least 3.1.11 ( changelog ) and kombu at least 3.0.15 ( changelog ). You must use the latest versions.

I got the same behavior when 3.3 was released. RabbitMQ flipped the prefetch_count flag by default. Prior to this, if the consumer reached the limit CELERYD_PREFETCH_MULTIPLIER in eta'd messages, the worker would have reached this limit in order to receive more messages. The change violated this behavior because the new default behavior denied this feature.

+3
source

I had similar symptoms. Messages that fall into MQ (visible in diagrams), but not picked up by the worker.

This led me to assume that my Django app was setting up the Celery app correctly, but I lost the import that provided the celery setting during Django startup :

 from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa 

This is a stupid mistake, but messages coming to the broker that returned AsyncResult made me track and made me look for the wrong places. Then I noticed that setting CELERY_ALWAYS_EAGER = True did not perform squats, an event, then the tasks were not performed at all.

PS: This may not be the answer to the @kev question, but since I met a couple of times looking for a solution to my problem, I am posting it here for everyone in a similar situation.

0
source

Source: https://habr.com/ru/post/989091/


All Articles