Spring Cloud Stream RabbitMQ

I am trying to understand why I would like to use Spring Cloud Stream with RabbitMQ. I looked through the RabbitMQ Spring 4 tutorial ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ), which basically I want to do. It creates a direct exchange with two attached queues and, depending on the routing key, the message is routed to Q1 or Q2.

The whole process is pretty straightforward, if you look at the tutorial, you create all the parts, tie them together, and you're ready to go.

I was wondering what benefit I will get in using Sing Cloud Stream, and if it's even a use case. It was easy to create a simple exchange and even determine the destination, and the group was straightforward with the flow. So I thought, why not go ahead and try to process the tutorial with the stream.

I saw that Stream has BinderAwareChannelResolverone that seems to be doing the same thing. But I'm struggling to put everything together to achieve the same as in the RabbitMQ Spring tutorial. I'm not sure that this is a dependency problem, but I seem to misunderstand something fundamentally here, I thought something like:

spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'

due to the trick.

- , , 2 , , https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html?

, , , . build.gradle, ( - , )

application.properties:

spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type

Sources.class:

public interface Sources {

    String OUTPUT = "output";

    @Output(Sources.OUTPUT)
    MessageChannel output();
}

StatusController.class:

/**
 * Status endpoint for the health-check service.
 */
@RestController
@EnableBinding(Sources.class)
public class StatusController {

    private int index;

    private int count;

    private final String[] keys = {"orange", "black", "green"};

    private Sources sources;

    private StatusService status;

    @Autowired
    public StatusController(Sources sources, StatusService status) {
        this.sources = sources;
        this.status = status;
    }

    /**
     * Service available, service returns "OK"'.
     * @return The Status of the service.
     */
    @RequestMapping("/status")
    public String status() {
        String status = this.status.getStatus();

        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == 3) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String payload = builder.toString();
        log.info(payload);

        // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
        // and add the value as routing key
        Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
        sources.output().send(msg);

        // return rest call
        return status;
    }
}

, :

spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black

Sinks.class:

public interface Sinks {

    String INPUT = "input";

    @Input(Sinks.INPUT)
    SubscribableChannel input();

    String INPUTER = "inputer";

    @Input(Sinks.INPUTER)
    SubscribableChannel inputer();
}

ReceiveStatus.class: :

@EnableBinding(Sinks.class)
public class ReceiveStatus {
    @StreamListener(Sinks.INPUT)
    public void receiveStatusOrange(String msg) {
       log.info("I received a message. It was orange number: {}", msg);
    }

    @StreamListener(Sinks.INPUTER)
    public void receiveStatusBlack(String msg) {
        log.info("I received a message. It was black number: {}", msg);
    }
}
+4
1

Spring , , ( @EnableBinding) Spring Cloud Stream Binder (Kafka, RabbitMQ, JMS binders ..).,). -, Spring Cloud Stream Spring AMQP RabbitMQ.

BinderAwareChannelResolver , , .

, 2 bindingRoutingKey, , ( -, ) ( ). , group . group (, ).

: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57, routing-key-expression. , this .

+3

Source: https://habr.com/ru/post/1683791/


All Articles