I am trying to check the persistence of an ActiveMQ queue.
I have a built-in ActiveMQ server with a unique consumer. This embedded server receives JMS messages from many other JVM applications.
It works great, the consumer application receives notifications.
So, I tried to check the persistence of messages. I set a (remote) breakpoint on the MessageListener for the consumer so that I can wrap a lot of messages and crash the ActiveMQ server. When restarting the server, I would like all messages registered in the queue to be used, not lost.
And then I tried this test. I hit this breakpoint the first time I sent a message. But for all the messages I'm trying to send, I get the following stack on the producer side:
Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format. at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469) at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534) at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:612) at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:604) at com.xxxxxxxxxxx.mobilepush.client.RealClientTest.main(RealClientTest.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) Caused by: javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format. at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1380) at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1466) at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:308) at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457) ... 9 more Caused by: java.io.IOException: Wire format negotiation timeout: peer did not send his wire format. at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:98) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1351) ... 13 more
I donβt understand why my producer will be blocked when my consumer is at my breakpoint.
My broker uri: mobilepush.activemq.broker.transport.connector.uri=tcp://0.0.0.0:61616
The manufacturer connects via tcp to the broker. A consumer hosted with a broker connects via vm://localhost .
My configuration is pretty simple:
SERVER: <amq:broker useJmx="false" persistent="true"> <amq:transportConnectors> <amq:transportConnector uri="${mobilepush.activemq.broker.transport.connector.uri}" /> </amq:transportConnectors> <amq:persistenceAdapter> <amq:kahaPersistenceAdapter directory="${mobilepush.activemq.broker.queue.persistence.directory}" maxDataFileLength="100 Mb"/> </amq:persistenceAdapter> </amq:broker>
CONSUMER: (management namespace and xebia class it only a JMX decorator) <bean id="connectionFactory" class="fr.xebia.management.jms.SpringManagedConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory" > <property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/> </bean> </property> </bean> <bean id="pushConsumer" class="com.xxxxxxxxxxxxxxx.mobilepush.messaging.jms.PushConsumer"> <property name="jmsPushMessageConverter" ref="jmsPushMessageConverter"/> <property name="pushDelegate" ref="directPushDelegate"/> </bean> <management:executor-service id="pushConsumerExecutor" pool-size="${mobilepush.consumer.thread.min}-${mobilepush.consumer.thread.max}" keep-alive="60" /> <jms:listener-container task-executor="pushConsumerExecutor" connection-factory="connectionFactory" acknowledge="auto" container-class="fr.xebia.springframework.jms.ManagedDefaultMessageListenerContainer"> <jms:listener destination="mobilepush.queue" ref="pushConsumer" method="onMessage" /> </jms:listener-container>
PRODUCER: <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" > <property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/> </bean> <bean id="mobilePushJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="defaultDestination" ref="mobilePushQueue"/> <property name="messageConverter" ref="jmsPushMessageConverter"/> <property name="connectionFactory"> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory"> <ref local="connectionFactory" /> </property> </bean> </property> </bean>