Spring Integration

Our application was developed using the Spring Integration Framework. A complete message flow begins with listening to Queue, for which adapters controlled by the JMS message were used, after certain channel points, that is, queue endpoints, and each endpoint are processed by Service-Activators.

We are currently at the execution stage, we are creating a request for 200 messages. Initially, we observed that messages were not executed in parallel, after some reading it turned out that adding the concurrent-consumer and max-concurrent-consumer properties to the JMS message controlled by the listener adapter would help enable multithreading. It really helped, but still somewhere between this process I still see the effect of Single Thread. Is this related to how the endpoint is defined? What is the advantage of adding queue capacity for each endpoint? Do you think that adding the queue capacity for each definition of the channel endpoint will again work in mutli-threading mode.

Project snapshot on request:

action flow

+6
source share
3 answers

It would be helpful to see the exact definition of your channels.

By default, the Spring feed consumes its message in the sender stream. In other words, it is synchronous. If you want the channel to consume messages asynchronously, you need to specify TaskExecutor. See http://static.springsource.org/spring/docs/3.0.5.RELEASE/reference/scheduling.html

0
source

Looking at the flow diagram, it looks like the flow has many single-threaded elements and can be optimized to be much more parallel with higher throughput.

To start with a message-driven channel adapter (you did not specify a configuration for this), you can configure more than 1 default consumer , and you can make it consume a reasonable number of messages per consumption cycle .

The stream that passes the message through the channel adapter, which places the message in direct channel 1, will unfortunately run the rest of the stream, since there is no buffering elsewhere, so when your message is placed in β€œDirect channel 1”, it will immediately calls the router in the same thread, then calls the service activator and the Mail adapter or the JMS Outbound adapter in the same thread. The change here may be to enter the queue channel instead of direct channel 1 , so the thread that consumes the message simply puts the messages on the queue channel, then runs with it.

Besides direct channel 1 (changed to queue channel 1), I think it can be single-threaded based on how fast or slower your stream is , if they say that the mail adapter is slow, and direct channel 4 can make the queue channel also , same with Direct Channel 5

Can you see if these changes were highlighted in bold with improved flow

0
source

To improve performance, I can suggest using an execution channel with a task executor, in which you can control the number of thread pool pools. Thus, you have a situation where the message comes in the jms queue, the consumer takes the message and processes the stream in a separate thread. remember that in this configuration, multi-threaded work is performed by the taskexecutor channel. taht will retrieve the message in a separate thread, for this reason you think well which multithreading class you want.

For the queue message channel, you really need a poller that polled the channel to perform regeneration, the queue capacity is the capacity of the atomic queue behind the scenes.

you can configure the executor channel this way in xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration" xmlns:tx="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <tx:executor id="taskExecutor" pool-size="10" queue-capacity="10"/> <si:channel id="ch" > <si:dispatcher task-executor="taskExecutor"/> </si:channel> </beans> 

or so in java-dsl

  @Bean public IntegrationFlow storeBookPageByPage(ConnectionFactory connectionFactory, @Qualifier("createBookQueue") Destination createBookQueue, @Qualifier("createBookResultQueue") Destination createBookResultQueue, PdfBookMasterRepository pdfBookMasterRepository, BookRepository bookRepository){ String tempFilePathBaseDir = environment.getProperty("bookService.storeBookPageByPage.tempFilePathBaseDir"); return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory) .destination(createBookQueue) .errorChannel(storeBookPageByPageErrorChannel())) .channel(channels -> channels.executor(Executors.newScheduledThreadPool(5))) ..... } 
0
source

Source: https://habr.com/ru/post/918912/


All Articles