I have a Spring JMS application using ActiveMQ version 5.10. I am doing a simple test before concurrency. I use Spring Boot, the current version and annotations to configure JMSListener and message producers.
The message provider simply throws messages into the queue as quickly as possible. The message listener pulls messages out of the queue, but sleeps for 1 second after receiving the message - simulating some work that the message listener should listen to after receiving the message.
I have JMSListener installed for 100-1000 concurrent threads. If I run the message creator and the user at the same time (both run in their own JVM), the consumer will never exceed the minimum configured threads, even if the maximum range is set to 1000.
If I let the producer start over and place several thousand messages in the queue, then run 1 or more consumer instances, he will constantly raise threads, starting at 100, and then about 20 threads each time until he gets to a state where in The queue is about 20-30 messages in flight. He never catches the manufacturer - there are always messages in the queue, even if the consumer is not near his maximum number of transactions.
Why doesn't the message consumer break into a bunch of additional threads to free the queue, instead of allowing the queue to have 20-30 messages in it? Isnβt there a way for the consumer to keep adding threads faster to catch up with the messages in the queue?
Here are the relevant parts of the code.
Message provider
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
Message consumer
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
return factory;
}
}
When the listener is set up as such ...
@Component
public class TransformationListener {
private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";
@JmsListener(destination=QUEUE, concurrency = "100-1000")
public void handleRequest(ImportantMessage importantMessage) {
System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}