Celery & Rabbitmq: WARNING / MainProcess] An unknown message was received and deleted. Wrong destination?!? - experiment on GIT

Recently, I have been experimenting with a GIT project to understand a large data processing framework.

1, GIT project: https://github.com/esperdyne/celery-message-processing

we have the following components:

1, AMPQ broker ( RabbitMQ ): it works as a message buffer that works like a messaging mailbox for different users!

2, worker: it works as a service server to provide services for various customer service. 3, Queue ( "celery" : it works as a container with several processes, which is used simultaneously to process various working instances.

the key configuration can be seen below:

We use the proj / celery.py object to define the application, the definition can be seen below:

app = Celery('proj', broker='amqp://', backend='redis://localhost', include=['proj.tasks']) 

enter the code here

when we launch the application:

1, when we launch the application, we saw a message that was received from rabbitmq, but celery could not process the message.

Parse.log looks like this: [2017-02-04 14: 28: 06,909: WARNING / MainProcess] An unknown message was received and deleted. Incorrect appointment?!?

we have the following question:

4.2.1 AMQP mechanism enter image description here We see that AMQP works as a message buffer, then there will be a message sender and message collector:

In the diagram above, who is the sender of the message and who is the recipient of the message.

4.2.2 Definition of messages In our application, we cannot find the code for determining the message for sending or receiving the AMQP form.

4.2.3 Message Monitor How we can track sending and receiving messages in AMQP. I hope that the teacher will help us solve the problem and give us some detailed

introduction to celery broker swords!

Note: error log can be seen here

 [2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: [[u'maildir/allen- p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b) {content_type:'application/json' content_encoding:'utf-8' delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': ' gen17347@centos1 ', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} [2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors [2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes [2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete [2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready. [2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1 [2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b) {content_type:'application/json' content_encoding:'utf-8' delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': ' gen19722@centos1 ', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} enter code here 
+6
source share
2 answers

It would be helpful to give the celery and librabbitmq options that you use. Since I had a very similar problem, I assume that you are using celery 4.0.2 and librabbitmq 1.6.1.

In this case, this is a known compatibility issue, you can refer to https://github.com/celery/celery/issues/3675 and https://github.com/celery/librabbitmq/issues/93 .

The first link gives you recommendations for solving your problem, namely:

  • uninstall librabbitmq pip uninstall librabbitmq (you may need to call this command repeatedly)

  • change the amqp values ​​to pyamqp in your borker urls. (Although not in your configuration file if you use it. This did not work for me).

To more accurately answer your other questions: you are right in saying that there is a sender and a mercenary.

The sender role is assumed by the application created by calling Celery(...) . One of its roles is to act as a task registry, and if you look at its implementation in app / base.py, you will see that it implements the send_task method, which is directly called by the apply_async method of the apply_async class. This role of the method is to send the programmed version of your task through the wire to the broker so that it can be received by the employee. The application protocol used to send the message is amqp, for which the implementation is librabbitmq.

On the other side of the wire, there is another instance launched by an employee who does the extraction work. In celery, it is called Consumer . You can find its implementation in employee / consumer / consumer.py. You will see that it implements create_task_handler , which in turn defines the on_task_received functions that cause the error you see. This is a function called when a new task is received from the worker and the next one in the line is processed.

Therefore, the proposed solution is to change the implementation of the amqp protocol, so that TypeError does not occur on_task_received (which, it seems to me, is caused by the encoding problem).

I hope he answers all your questions and gives you a clearer idea of ​​how celery works. I must end by saying that, as far as I know, the β€œnormal” use of celery will never require you to interfere with such internal functions and that you can achieve 99% of what you might need, for example, by creating custom task classes and custom backend.

+10
source

Just so that the answer is also here. In the thread, Anis refers to 23doors, mentions that the new Celery 4 default protocol does not work with librabbitmq :

Apparently, the librabbitmq problem is related to the new default protocol in celery 4.x.

He also mentions that to solve this problem you can use the old protocol suggested by Celery by installing (if you are using Django):

 CELERY_TASK_PROTOCOL = 1 

Otherwise, you can set the following in the celeryconf.py file

 app.conf.task_protocol = 1 

All loans for 23 days :)

0
source

Source: https://habr.com/ru/post/1014754/


All Articles