Recently, I have been experimenting with a GIT project to understand a large data processing framework.
1, GIT project: https://github.com/esperdyne/celery-message-processing
we have the following components:
1, AMPQ broker ( RabbitMQ ): it works as a message buffer that works like a messaging mailbox for different users!
2, worker: it works as a service server to provide services for various customer service. 3, Queue ( "celery" : it works as a container with several processes, which is used simultaneously to process various working instances.
the key configuration can be seen below:
We use the proj / celery.py object to define the application, the definition can be seen below:
app = Celery('proj', broker='amqp://', backend='redis://localhost', include=['proj.tasks'])
enter the code here
when we launch the application:
1, when we launch the application, we saw a message that was received from rabbitmq, but celery could not process the message.
Parse.log looks like this: [2017-02-04 14: 28: 06,909: WARNING / MainProcess] An unknown message was received and deleted. Incorrect appointment?!?
we have the following question:
4.2.1 AMQP mechanism enter image description here We see that AMQP works as a message buffer, then there will be a message sender and message collector:
In the diagram above, who is the sender of the message and who is the recipient of the message.
4.2.2 Definition of messages In our application, we cannot find the code for determining the message for sending or receiving the AMQP form.
4.2.3 Message Monitor How we can track sending and receiving messages in AMQP. I hope that the teacher will help us solve the problem and give us some detailed
introduction to celery broker swords!
Note: error log can be seen here
[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: [[u'maildir/allen- p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b) {content_type:'application/json' content_encoding:'utf-8' delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': ' gen17347@centos1 ', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} [2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors [2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes [2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete [2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready. [2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1 [2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b) {content_type:'application/json' content_encoding:'utf-8' delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': ' gen19722@centos1 ', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} enter code here