Celery Transfer vs. RabbitMQ Branching

I’ve been working with celery lately and I don’t like it. The configuration is dirty, complex, and poorly documented.

I want to send broadcast messages using Celery from one manufacturer to multiple consumers. I am confused by the discrepancy between the terms celery and the terms of the underlying RabbitMQ transport.

In RabbitMQ you can have one fanout Exchange and several queues for broadcasting messages:

enter image description here

But in Celery, all terms are confused: here you can have a broadcast queue that sends messages to several consumers:

enter image description here

I don’t even understand how it is supposed that the Celery broadcast queue should work at all, because RabbitMQ queues with multiple consumers are designed to balance the load. Thus, in RabbitMQ, if several consumers are connected to the same queue (i.e., the Consumer Pool), only one consumer will receive and process the message,

+6
source share
3 answers

Does it help? http://celery.readthedocs.org/en/latest/userguide/routing.html#exchanges-queues-and-routing-keys

It seems that the definition of "queue" in Celeryer includes an exchange, so you can define a celery queue on top of the Exchange('fanout') exchange type, which will have a basic implementation of several RabbitMQ queues.

In this case, I would suggest that you do not want the Celery configuration to have a “broadcast” queue unless you really want several workers to handle the same task.

+1
source

After looking at the code (it is in the kombu.common package, not celery ) and tried it, it works as follows:

  • You define Broadcast 'queue' named 'foo' in your celery configuration.
  • This creates an auto_delete with the name 'foo' and auto_delete with a unique identifier (via uuid ) and with the alias 'foo' (I don't think that the alias is actually used everywhere, this is just for reference, because the real queue name is randomly generated)
  • Unique queue tied to foo exchange

So, the class has the name Broadcast , but this is really a unique queue related to branching exchanges. Therefore, when each worker starts up, he creates his own unique queue and associates with the branching exchange.

+1
source

if you are using Celery 4.0.1+ and the broadcast doesn’t work for you please https://github.com/celery/celery/pull/3934 and see clokep solution, it restores the previous version of amqp.py and it works for me.

+1
source

Source: https://habr.com/ru/post/971003/


All Articles