I am trying to understand why I would like to use Spring Cloud Stream with RabbitMQ. I looked through the RabbitMQ Spring 4 tutorial ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ), which basically I want to do. It creates a direct exchange with two attached queues and, depending on the routing key, the message is routed to Q1 or Q2.
The whole process is pretty straightforward, if you look at the tutorial, you create all the parts, tie them together, and you're ready to go.
I was wondering what benefit I will get in using Sing Cloud Stream, and if it's even a use case. It was easy to create a simple exchange and even determine the destination, and the group was straightforward with the flow. So I thought, why not go ahead and try to process the tutorial with the stream.
I saw that Stream has BinderAwareChannelResolverone that seems to be doing the same thing. But I'm struggling to put everything together to achieve the same as in the RabbitMQ Spring tutorial. I'm not sure that this is a dependency problem, but I seem to misunderstand something fundamentally here, I thought something like:
spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
due to the trick.
- , , 2 , , https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html?
, , , . build.gradle, ( - , )
application.properties:
spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Sources.class:
public interface Sources {
String OUTPUT = "output";
@Output(Sources.OUTPUT)
MessageChannel output();
}
StatusController.class:
@RestController
@EnableBinding(Sources.class)
public class StatusController {
private int index;
private int count;
private final String[] keys = {"orange", "black", "green"};
private Sources sources;
private StatusService status;
@Autowired
public StatusController(Sources sources, StatusService status) {
this.sources = sources;
this.status = status;
}
@RequestMapping("/status")
public String status() {
String status = this.status.getStatus();
StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == 3) {
this.index = 0;
}
String key = keys[this.index];
builder.append(key).append(' ');
builder.append(Integer.toString(++this.count));
String payload = builder.toString();
log.info(payload);
Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
sources.output().send(msg);
return status;
}
}
, :
spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
Sinks.class:
public interface Sinks {
String INPUT = "input";
@Input(Sinks.INPUT)
SubscribableChannel input();
String INPUTER = "inputer";
@Input(Sinks.INPUTER)
SubscribableChannel inputer();
}
ReceiveStatus.class: :
@EnableBinding(Sinks.class)
public class ReceiveStatus {
@StreamListener(Sinks.INPUT)
public void receiveStatusOrange(String msg) {
log.info("I received a message. It was orange number: {}", msg);
}
@StreamListener(Sinks.INPUTER)
public void receiveStatusBlack(String msg) {
log.info("I received a message. It was black number: {}", msg);
}
}