Consumer survey with Akka, SQS and Camel

The project I'm working on requires reading messages from SQS, and I decided to use Akka to distribute the processing of these messages.

Since SQS is Camel support, and there is functionality built-in for use in Akka in the Consumer class, I realized that it would be better to implement the endpoint and read messages this way, although I have not seen many examples of people doing this.

My problem is that I cannot interrogate the queue fast enough so that my queue is empty or almost empty. Initially, I thought that I could get Consumer to receive messages via Camel from SQS at X / s speed. From there, I could simply create more Consumers to get the speed with which I need processed messages.

My consumer:

import akka.camel.{CamelMessage, Consumer} import akka.actor.{ActorRef, ActorPath} class MyConsumer() extends Consumer { def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" var count = 0 def receive = { case msg: CamelMessage => { count += 1 } case _ => { println("Got something else") } } override def postStop(){ println("Count for actor: " + count) } } 

As shown, I set delay=1 as well as &maxMessagesPerPoll=10 to improve message speed, but I cannot call multiple consumers with the same endpoint.

I read in the docs that By default endpoints are assumed not to support multiple consumers. , and I believe that this is also true for SQS endpoints, as spawning of several consumers will give me only one consumer, which after starting the system for a minute displays the message Count for actor: x instead of the others that display Count for actor: 0 .

If at all useful; I can read about 33 messages per second with this current implementation for a single consumer.

Is it right to read messages from the SQS queue in Akka? If so, how can I do this, can I make it scale outward to increase the speed of receiving messages closer to 900 messages per second?

+6
source share
2 answers

Sadly Camel does not currently support concurrent message consumption in SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

To solve this problem, I wrote my own actor to poll SQS batch messages using aws-java-sdk.

  def receive = { case BeginPolling => { // re-queue sending asynchronously self ! BeginPolling // traverse the response val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] val messages = sqs.receiveMessage(receiveMessageRequest).getMessages messages.toList.foreach { node => { deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) //log.info("Node body: {}", node.getBody) filterSupervisor ! node.getBody } } if(deleteEntryList.size() > 0){ val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) sqs.deleteMessageBatch(deleteMessageBatchRequest) } } case _ => { log.warning("Unknown message") } } 

Although I’m not sure that this is the best implementation, and of course it could be improved so that requests would not constantly end up in an empty queue, this corresponds to my current needs for the ability to poll messages from the same queue.

Getting about 133 (messages / seconds) / actor from SQS with this.

+5
source

Camel 2.15 supports concurrentConsumers, although I’m not sure how useful it is that I don’t know if akka camel 2.15 supports it, and I don’t know if one consumer actor matters, even if there are several consumers.

+1
source

Source: https://habr.com/ru/post/957352/


All Articles