Is pooling in akka-http using the original queue Implementation thread safe?

Referring to the following implementation mentioned in:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew) .via(poolClientFlow) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run() 

Can thread-safe offer HTTP queue requests from multiple threads? If this is not the case, what is the best way to implement such a requirement? perhaps using a dedicated actor?

+5
source share
2 answers

No, it is not thread safe, according to api doc : SourceQueue that current source is materialized to is for single thread usage only.

The special actor will work fine, but if you can, using Source.actorRef ( doc link ) instead of Source.queue will be easier.

In general, the disadvantage of Source.actorRef is the lack of back pressure, but as you use OverflowStrategy.dropNew , it is clear that you do not expect back pressure. This way you can get the same behavior using Source.actorRef .

+1
source

As @ frederic-a correctly pointed out, SourceQueue not a thread safe solution.

Perhaps a suitable solution would be to use MergeHub (see docs for more details). This effectively allows you to run the chart in two steps.

  • from your hub to your sink (this materializes in the sink).
  • distribute the receiver materialized at point 1 to your users. Sink is actually intended to be distributed, so it is completely safe.

This solution would be backpressure safe, like MergeHub behavior

If the consumer cannot keep up, then all manufacturers are backpressured.

Sample code below:

 val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16) .via(poolClientFlow) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run() // on the user threads val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ??? source.runWith(reqSink) 
+1
source

Source: https://habr.com/ru/post/1265902/


All Articles