This actually does not work.
.aggregate() is the third subscriber to publishSubscribeChannel .
You need to break the flow to two of them. Like this:
@Bean public IntegrationFlow publishSubscribeFlow() { return flow -> flow .publishSubscribeChannel(s -> s .applySequence(true) .subscribe(f -> f .handle((p, h) -> "Hello") .channel("publishSubscribeAggregateFlow.input")) .subscribe(f -> f .handle((p, h) -> "World!") .channel("publishSubscribeAggregateFlow.input")) ); } @Bean public IntegrationFlow publishSubscribeAggregateFlow() { return flow -> flow .aggregate(a -> a.outputProcessor(g -> g.getMessages() .stream() .<String>map(m -> (String) m.getPayload()) .collect(Collectors.joining(" ")))) .channel(c -> c.queue("subscriberAggregateResult")); }
Please note the use of .channel("publishSubscribeAggregateFlow.input") from both subscribers.
Honestly, this is the point of any publish-subscribe . We need to know where to send the result of all subscribers, if we are going to fill them.
Your use case reminds me of the Scatter-Gather EIP template.
We do not yet have its implementation in DSL. Feel free to raise a GH question on this, and we will try to handle it in the upcoming version 1.2 .
UPDATE
GH issue on this: https://github.com/spring-projects/spring-integration-java-dsl/issues/75
source share