Spring Java DSL Integration - Aggregator Setup

I have a very simple integration flow where a RESTful request is passed to two providers using the publish-subscribe channel. The result from both RESTful services is then aggregated in one array. A sketch of the integration flow is shown below:

@Bean IntegrationFlow flow() throws Exception { return IntegrationFlows.from("inputChannel") .publishSubscribeChannel(s -> s.applySequence(true) .subscribe(f -> f .handle(Http.outboundGateway("http://provider1.com/...") .httpMethod(HttpMethod.GET) .expectedResponseType(ItemDTO[].class)) ).subscribe(f -> f .handle(Http.outboundGateway("http://provider2.com/...") .httpMethod(HttpMethod.GET) .expectedResponseType(ItemDTO[].class) ) ) ) .aggregate() .get(); } 

However, when I run my code, the resulting array contains elements returned by only one of the RESTful services. Is there any configuration step that I am missing?

UPDATE

The next version corresponds to the complete solution, taking into account the comments of Artem.

 @Bean IntegrationFlow flow() throws Exception { return IntegrationFlows.from("inputChannel-scatter") .publishSubscribeChannel(s -> s.applySequence(true) .subscribe(f -> f .handle(Http.outboundGateway("http://provider1.com/...") .httpMethod(HttpMethod.GET) .expectedResponseType(ItemDTO[].class)) .channel("inputChannel-gather")) .subscribe(f -> f .handle(Http.outboundGateway("http://provider2.com/...") .httpMethod(HttpMethod.GET) .expectedResponseType(ItemDTO[].class)) .channel("inputChannel-gather"))) .get(); } @Bean IntegrationFlow gatherFlow() { return IntegrationFlows.from("inputChannel-gather") .aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>( g.getMessages().stream() .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload())) .collect(Collectors.toList()).toArray(new ItemDTO[0])))) .get(); } 
+5
source share
1 answer

This actually does not work.

.aggregate() is the third subscriber to publishSubscribeChannel .

You need to break the flow to two of them. Like this:

  @Bean public IntegrationFlow publishSubscribeFlow() { return flow -> flow .publishSubscribeChannel(s -> s .applySequence(true) .subscribe(f -> f .handle((p, h) -> "Hello") .channel("publishSubscribeAggregateFlow.input")) .subscribe(f -> f .handle((p, h) -> "World!") .channel("publishSubscribeAggregateFlow.input")) ); } @Bean public IntegrationFlow publishSubscribeAggregateFlow() { return flow -> flow .aggregate(a -> a.outputProcessor(g -> g.getMessages() .stream() .<String>map(m -> (String) m.getPayload()) .collect(Collectors.joining(" ")))) .channel(c -> c.queue("subscriberAggregateResult")); } 

Please note the use of .channel("publishSubscribeAggregateFlow.input") from both subscribers.

Honestly, this is the point of any publish-subscribe . We need to know where to send the result of all subscribers, if we are going to fill them.

Your use case reminds me of the Scatter-Gather EIP template.

We do not yet have its implementation in DSL. Feel free to raise a GH question on this, and we will try to handle it in the upcoming version 1.2 .

UPDATE

GH issue on this: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

+3
source

Source: https://habr.com/ru/post/1247516/


All Articles