TL; DR: is it better to materialize the stream for each request (i.e. use short-lived streams) or use the materialization of one stream for requests when I have an outgoing HTTP request as part of a stream?
Details: I have a typical service that accepts an HTTP request, scatters it onto several third-party downstream services (not controlled by me) and aggregates the results before sending them. I use akka-http to implement the client and a spray for the server (legacy, will eventually switch to akka-http). Schematically:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
This can be achieved either by materializing the stream per request, or by materializing (parts) of the stream once and dividing it into requests.
Materialization for each request carries material overhead 1 and it is unclear how to use connection pools with it. The problem is described here (many materializations can run out of pool). I can wrap the pool in a long HTTP stream, for example, here and wrap it in the mapAsync
"upstream", but I don’t understand the error handling strategy. When one request fails and the thread stops, will it delete the pool? Moreover, it seems to me that I will need to coordinate requests and responses, since they are not returned in order.
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID())
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
}
.runWith(Sink.last)
})
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
}
.runWith(Sink.last)
})
- , , . API , , .
?
, , , "" .
1 , , , akka-http.