As @ frederic-a correctly pointed out, SourceQueue not a thread safe solution.
Perhaps a suitable solution would be to use MergeHub (see docs for more details). This effectively allows you to run the chart in two steps.
- from your hub to your sink (this materializes in the sink).
- distribute the receiver materialized at point 1 to your users.
Sink is actually intended to be distributed, so it is completely safe.
This solution would be backpressure safe, like MergeHub behavior
If the consumer cannot keep up, then all manufacturers are backpressured.
Sample code below:
val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16) .via(poolClientFlow) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run()
source share