If you can take some time to wait after all your messages have been delivered, you can use a timer and call consumer.poll (0), which immediately returns with available messages. After you consume the message, you set the timer again with the same acceptable delay, say 100 ms.
When throughput is low, batches will be small, and this delay will be more likely to interfere. However, since the script is asynchronous, the delay is also not very important. You never know exactly when a message arrives anyway.
When throughput is very high, lots will be large. For the new Kafka consumer, the default value of fetch.max.bytes is 52428800. The additional delay will be relatively small compared to the time taken to process a large batch of messages.
You can wrap this with a small component to which you provide a function that matches your current MBean handler.
source share