How to restrict Akka thread to execute and send one message only once per second?

I have an Akka stream, and I want the stream to send messages downstream about every second.

I tried two ways to solve this problem, the first way was to force the producer at the beginning of the stream to send messages only once per second, when the message Continue arrives at this actor.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

This doesn’t work for long, and Continue message flows appear in the ActorPublisher actor, I assume (guess, but not sure) from the downstream through back pressure requests, since the downstream can consume quickly, but the upstream does not generate a fast pace. Therefore, this method failed.

Another way I tried was back pressure control, I used MaxInFlightRequestStrategy in the ActorSubscriber at the end of the stream to limit the number of messages to 1 per second. This works, but incoming messages arrive at about three or so at the same time, and not just one at a time. It seems that backpressure control does not immediately change the speed of messages arriving in the OR, messages have already been queued in the stream and are waiting for processing.

So the problem is, how can I use an Akka thread that can only process one message per second?


I found that MaxInFlightRequestStrategy is a valid way to do this, but I have to set the lot size to 1, its default lot size is 5, which caused the problem. It’s also a tricky way to solve the problem now that I’m looking at the answer provided here.

+5
source share
1 answer

You can put your elements through a throttling flow that will support a quick source of pressure, or you can use a combination of tick and zip .

The solution of the first type will be like this:

 val veryFastSource = Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) val throttlingFlow = Flow[Long].throttle( // how many elements do you allow elements = 1, // in what unit of time per = 1.second, maximumBurst = 0, // you can also set this to Enforcing, but then your // stream will collapse if exceeding the number of elements / s mode = ThrottleMode.Shaping ) veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println)) 

The second solution would be:

 val veryFastSource = Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) val tickingSource = Source.tick(1.second, 1.second, 0) veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println)) 
+11
source

Source: https://habr.com/ru/post/1245751/


All Articles