Moving messages between rabbitMQ queues

I am looking to be able to move messages between queues (manually) in Rabbit.

For instance:

first-queue has messages ['a','b','c','d','e','f']
second-queue has messages ['x','y']

I want to be able to move, for example, the message 'a' to the second order from the first order. It can be handmade. Both queues are in the same broker, and I do not want to send them through any exchange. Is there any way to do this? I play with rabbitmqctl but cannot make it work. I am open to any other tools that would allow me to accomplish this. In the end, I hope you have some sort of message selector (for example, move all messages with some header field = X from the queue of the queue to the second place).

I'm still new to rabbitmq and amqp, but could not find documentation on how to do this (if possible).

Thank.

+4
source share
3 answers

@Dax - I just answered the same question here: Is it possible to move / merge messages between RabbitMQ queues?

I have a long description. To avoid duplicate content, I do not want to copy / paste.

Looks like what you're looking for is a rabbit shovel plugin.

It is built into the kernel, just enable it:

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

In the Admin section of the graphical interface, you will find a simple interface for creating shovels.

See another message for me deets!

+6
source

, , , .

- . № 1, - , , .

, SQL.

, (, , , ) , .

+4

Java- :

public void moveMessages(
            final String sourceQueueName,
            final String targetQueueName,
            final String rabbitmqHost,
            final String rabbitmqUsername,
            final String rabbitmqPassword,
            final String rabbitmqVirtualHost
) throws IOException {

        // Initialize the consuming and publishing channel
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(rabbitmqHost);
        factory.setUsername(rabbitmqUsername);
        factory.setPassword(rabbitmqPassword);
        factory.setVirtualHost(rabbitmqVirtualHost);
        Connection connection = factory.newConnection();

        Channel consumingChannel = connection.createChannel();
        Channel publishingChannel = connection.createChannel();


        while (true) {
            // Get the first message in the queue (auto ack = false)
            GetResponse response = consumingChannel.basicGet(sourceQueueName, false);

            if (response == null) {
                return;
            }

            BasicProperties properties = response.getProps();

            // Publish the message to the origin queue
            publishingChannel.txSelect();
            publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody());
            publishingChannel.txCommit();

            // Acknowledge the message in the dead letter queue
            consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false);
        }
    }
+2

Source: https://habr.com/ru/post/1533408/


All Articles