Proper shutdown of ActiveMQ and Spring DefaultMessageListenerContainer

Our system will not be disabled when the Stop command is issued from Tomcat Manager. I decided it was related to ActiveMQ / Spring. I even figured out how to make it shut down, however my solution is to hack (at least I hope this is not the “right” way to do this). I would like to know the correct way to disable ActiveMQ so that I can remove my hack.

I inherited this component, and I have no information about why some architectural decisions were made, after a lot of digging, I think I understand his thoughts, but I could miss something. In other words, the real problem may be how we try to use ActiveMQ / Spring.

We run ServletContainer (Tomcat 6/7) and use ActiveMQ 5.9.1 and Spring 3.0.0. Several instances of our application can run in a "group", with each instance running on its own server. ActiveMQ is used to facilitate communication between multiple instances. Each instance has its own built-in broker and its own set of queues. Each queue on each instance has exactly 1 org.springframework.jms.listener.DefaultMessageListenerContainer, listening to it, so 5 queues = 5 DefaultMessageListenerContainers, for example.

Our system shutdown until we fixed the error by adding queuePrefetch = "0" to ConnectionFactory. At first I suggested that this change was incorrect, but now that I understand the situation, I am sure that we should not use the prefetch function.

I created a test application to replicate the problem. Please note that the information below does not mention message producers. This is because I can replicate the problem without sending / processing a single message. Simply creating a broker, ConnectionFactory, Queues, and Listeners at boot time is enough to prevent the system from stopping normally.

Here is my sample configuration from my Spring XML. I will be happy to provide the whole project if someone wants:

<amq:broker persistent="false" id="mybroker"> 
 <amq:transportConnectors> 
  <amq:transportConnector uri="tcp://0.0.0.0:61616"/> 
 </amq:transportConnectors> 
</amq:broker> 

<amq:connectionFactory id="ConnectionFactory" brokerURL="vm://localhost?broker.persistent=false" > 
 <amq:prefetchPolicy> 
  <amq:prefetchPolicy queuePrefetch="0"/> 
 </amq:prefetchPolicy> 
</amq:connectionFactory> 

<amq:queue id="lookup.mdb.queue.cat" physicalName="DogQueue"/> 
<amq:queue id="lookup.mdb.queue.dog" physicalName="CatQueue"/> 
<amq:queue id="lookup.mdb.queue.fish" physicalName="FishQueue"/> 

<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" abstract="true"> 
 <property name="connectionFactory" ref="ConnectionFactory"/> 
</bean> 

<bean parent="messageListener" id="cat"> 
 <property name="destination" ref="lookup.mdb.queue.dog"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="dog"> 
 <property name="destination" ref="lookup.mdb.queue.cat"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="fish"> 
 <property name="destination" ref="lookup.mdb.queue.fish"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

ServletContextListener . , , DefaultMessageListenerContainers. , , , . , .

!

UPDATE , , . amq: transportConnector uri tcp://0.0.0.0: 61616? Transport.daemon = true

  <amq:broker persistent="false" id="mybroker" brokerName="localhost">
   <amq:transportConnectors>
    <amq:transportConnector uri="tcp://0.0.0.0:61616?daemon=true"/>
   </amq:transportConnectors>
  </amq:broker>

  <amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost" >
   <amq:prefetchPolicy>
    <amq:prefetchPolicy queuePrefetch="0"/>
   </amq:prefetchPolicy>
  </amq:connectionFactory>

- brokerUrl amq: connectionFactory, , , , VMTransportFactory. , .

+4
2

, - , , ListenerContainer ActiveMQ.

ActiveMQConnection, ThreadPoolExecutor . , -, .

https://issues.apache.org/jira/browse/AMQ-796

executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
        //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
        //thread.setDaemon(true);
        return thread;
    }
});
0

Source: https://habr.com/ru/post/1543318/


All Articles