Akka-stream + akka-http life cycle

TL; DR: is it better to materialize the stream for each request (i.e. use short-lived streams) or use the materialization of one stream for requests when I have an outgoing HTTP request as part of a stream?

Details: I have a typical service that accepts an HTTP request, scatters it onto several third-party downstream services (not controlled by me) and aggregates the results before sending them. I use akka-http to implement the client and a spray for the server (legacy, will eventually switch to akka-http). Schematically:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

This can be achieved either by materializing the stream per request, or by materializing (parts) of the stream once and dividing it into requests.

Materialization for each request carries material overhead 1 and it is unclear how to use connection pools with it. The problem is described here (many materializations can run out of pool). I can wrap the pool in a long HTTP stream, for example, here and wrap it in the mapAsync"upstream", but I don’t understand the error handling strategy. When one request fails and the thread stops, will it delete the pool? Moreover, it seems to me that I will need to coordinate requests and responses, since they are not returned in order.

// example of stream per request

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
    Flow[HttpRequest]
      .map(req => req -> UUID.randomUUID()) // I don't care about id because it a single request per stream.
      .via(connectionFlow)
      .map { case (response, _) => response }

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .via(httpFlow)
    .mapAsync(1) {
       // response handling logic
    }
    .runWith(Sink.last)
})


// example of stream per request with long running http stream

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .mapAsync(1)(queueRequest)
    .mapAsync(1) {
       // somehow reconcile request with response?
       // response handling logic
    }
    .runWith(Sink.last)
})

- , , . API , , .

?

, , , "" .

1 , , , akka-http.

+4
1

, Flow . , Connection, ( ).

, :

: UUID   , . T " ", , HttpResponse . Int Range:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
  Source
    .fromIterator( () => Iterator range (0,5) )
    .map(i => HttpRequest(...) -> i)
    .via(connectionFlow)

Int, .

. , " ". . , (Failure(exception), Int) . , Int , .

+1

Source: https://habr.com/ru/post/1686441/


All Articles