How to get AmazonSQSBufferedAsyncClient to clear messages?

I am using the AWS SDK for Java, and I am using the async buffer sqs client for batch queries to reduce costs.

When my application shuts down, I want to make sure that there are no pending messages in the buffer, but there is no .flush() method that I can see on the client.

Does AmazonSQSBufferedAsyncClient.shutdown() my messages when called? I looked at the source code and this is unclear. The method calls shutdown() on every QueueBuffer that it has, but inside QueueBuffer.shutdown() it says

 public void shutdown() { //send buffer does not require shutdown, only //shut down receive buffer receiveBuffer.shutdown(); } 

In addition, the documentation for .shutdown() says:

Closes this client object, freeing up any resources that may be open. This is an optional method, and callers are not expected to call it , but maybe if they want to explicitly free up any open Resources. After the client completes, it should not be used to make any requests.

For this application, I need to ensure that messages are not lost during buffering. Do I need to handle this manually using a regular AmazonSQSClient instead of buffering / asynchronous?

+6
source share
2 answers

With the SDK version 1.11.37 for QueueBufferConfig there is a configuration parameter for this purpose.

 AmazonSQSBufferedAsyncClient bufClient = new AmazonSQSBufferedAsyncClient( realAsyncClient, new QueueBufferConfig( ) .withFlushOnShutdown(true) ); 
+1
source

There is a method to explicitly call a flash, but it is not available, and in fact, I could not find a call to this method in amazon code. Something seems to be missing.

When you call shutdown on an asynchronous client, it executes the following code:

 public void shutdown() { for( QueueBuffer buffer : buffers.values() ) { buffer.shutdown(); } realSQS.shutdown(); } 

And QueueBuffer # shutdown () looks like this:

 /** * Shuts down the queue buffer. Once this method has been called, the * queue buffer is not operational and all subsequent calls to it may fail * */ public void shutdown() { //send buffer does not require shutdown, only //shut down receive buffer receiveBuffer.shutdown(); } 

So it looks like they intentionally do not call sendBuffer.shutdown (), which is a method that flushes every message in a buffer that has not yet been sent.

Have you discovered a case where you are shutting down SQS Client and have lost messages? They seem to know about this, and in this case this should not happen, but if you want to be sure that you can call this method with a reflection that it is really nasty, but it will satisfy your needs.

  AmazonSQSBufferedAsyncClient asyncSqsClient = <your initialization code of the client>; Field buffersField = ReflectionUtils.findField(AmazonSQSBufferedAsyncClient.class, "buffers"); ReflectionUtils.makeAccessible(buffersField); LinkedHashMap<String, Object> buffers = (LinkedHashMap<String, Object>) ReflectionUtils.getField(buffersField, asyncSqsClient); for (Object buffer : buffers.values()) { Class<?> clazz = Class.forName("com.amazonaws.services.sqs.buffered.QueueBuffer"); SendQueueBuffer sendQueueBuffer = (SendQueueBuffer) ReflectionUtils.getField(ReflectionUtils.findField(clazz, "sendBuffer"), buffer); sendQueueBuffer.flush();//finally } 

Something like this should work, I think. Let me know!

+1
source

Source: https://habr.com/ru/post/970960/


All Articles