The project I'm working on requires reading messages from SQS, and I decided to use Akka to distribute the processing of these messages.
Since SQS is Camel support, and there is functionality built-in for use in Akka in the Consumer class, I realized that it would be better to implement the endpoint and read messages this way, although I have not seen many examples of people doing this.
My problem is that I cannot interrogate the queue fast enough so that my queue is empty or almost empty. Initially, I thought that I could get Consumer to receive messages via Camel from SQS at X / s speed. From there, I could simply create more Consumers to get the speed with which I need processed messages.
My consumer:
import akka.camel.{CamelMessage, Consumer} import akka.actor.{ActorRef, ActorPath} class MyConsumer() extends Consumer { def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" var count = 0 def receive = { case msg: CamelMessage => { count += 1 } case _ => { println("Got something else") } } override def postStop(){ println("Count for actor: " + count) } }
As shown, I set delay=1 as well as &maxMessagesPerPoll=10 to improve message speed, but I cannot call multiple consumers with the same endpoint.
I read in the docs that By default endpoints are assumed not to support multiple consumers. , and I believe that this is also true for SQS endpoints, as spawning of several consumers will give me only one consumer, which after starting the system for a minute displays the message Count for actor: x instead of the others that display Count for actor: 0 .
If at all useful; I can read about 33 messages per second with this current implementation for a single consumer.
Is it right to read messages from the SQS queue in Akka? If so, how can I do this, can I make it scale outward to increase the speed of receiving messages closer to 900 messages per second?