Akka Stream uses HttpResponse in a stream

I would like to use a simple stream to collect additional data from the http service and improve my data object with the results. The following is an idea:

val httpClient = Http().superPool[User]()

val cityRequest = Flow[User].map { user=>
  (HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), user) => user
  case (Success(resp), user) => {
    // << What to do here to get the value >> //
    val responseData = processResponseSomehowToGetAValue?
    val enhancedUser = new EnhancedUser(user.data, responseData)
    enhancedUser
  }
}

val processEnhancedUser = Flow[EnhancedUser].map {
  // e.g.: Asynchronously save user to a database
}

val useEnhancementGraph = userSource
  .via(getRequest)
  .via(httpClient)
  .via(getResponse)
  .via(processEnhancedUser)
  .to(Sink.foreach(println))

I have a problem to understand the mechanics and the difference between streaming nature and materialization / futures within a stream.

The following ideas did not explain to me:

How to get the value from the response into a new custom object, so I can process this object in the next steps.

Thanks for the help.

Update:

HTTP- akka, 10 , . , "EnhancedUser" , , , .

.async cityResponse - , , .

?

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), member) => member
  case (Success(response), member) => {
    Unmarshal(response.entity).to[String] onComplete {
      case Success(s) =>  member.city = Some(s)
      case Failure(ex) => member.city = None
    }
  }
  member
}.async  // <<-- This changed the behavior to be correct, why?
+4
1

, , "cityRequestEndpoint":

- , , , N , N . , , , "" .

HttpResponse, , :

val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] = 
  (response, user) => response match {
    case Failure(_) => Source single (None -> user)
    case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
  }

, N, r.entity.dataBytes 0 ByteString . !

, . Flow.flatMapConcat, ( flatMap Iterables ):

val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] = 
  Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource

, , (ByteString, User) EnhancedUser. . , User EnhancedUser, :

val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser = 
  (byteStr, user) => 
    byteStr
      .map(s => EnhancedUser(user.data, s))
      .getOrElse(user)

val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] = 
  Flow[(ByteString, User)] map convertByteStringToUser

:

val useEnhancementGraph =
  userSource
    .via(cityRequest)
    .via(httpClient)
    .via(cityByteStrFlow)
    .via(cityUserFlow)
    .via(processEnhancedUser)
    .to(Sink foreach println)

Futures , , . :

  • , 1 ByteString. ByteStrings, , EnhancedUser.
  • - ByteString, Async.await ( ).

, Flow.mapAsync Flow.map , a Future :

val parallelism = 10

val timeout : FiniteDuration = ??? //you need to specify the timeout limit

val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] = 
  _ match {
    case (Failure(ex), user)   => 
      Future successful user
    case (Success(resp), user) => 
      resp
        .entity
        .toStrict(timeout)
        .map(byteStr => new EnhancedUser(user.data, byteStr))
  }    

val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
  Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
+5

Source: https://habr.com/ru/post/1674116/


All Articles