Akka-http chunked response concatenation

I use akka-http to make a request to the http service, which sends a response. Here's what the corresponding bit of code looks like:

 val httpRequest: HttpRequest = //build the request val request = Http().singleRequest(httpRequest) request.flatMap { response => response.entity.dataBytes.runForeach { chunk => println("-----") println(chunk.utf8String) } } 

and the output from the command line looks something like this:

 ----- {"data": ----- "some text"} ----- {"data": ----- "this is a longer ----- text"} ----- {"data": "txt"} ----- ... 

The logical part of the data - in this case, json ends with the end of line character \r\n , but the problem is that json does not always fit into one fragment of the HTTP response, as is clearly seen in the example above.

My question is: how to combine incoming data into full jsons so that the resulting container type still remains either Source[Out,M1] or Flow[In,Out,M2] ? I would like to follow the ideology of akka-stream .

UPDATE: It should also be noted that the answer is endless and aggregation should be done in real time

+5
source share
3 answers

Found a solution:

 val request: HttpRequest = //build the request request.flatMap { response => response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) .filter(_.contains("\r\n")) .runForeach { json => println("-----") println(json) } } 
+3
source

the akka thread's documentation contains an entry in the cookbook on this very issue: "Parsing strings from a ByteString". Their solution is rather verbose, but can also cope with a situation where one piece can contain several lines. This seems more reliable since the block size can change enough to handle multiple json messages.

0
source
 response.entity.dataBytes .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096)) .mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data => if (response.status == OK) { val event: Future[Event] = Unmarshal(data).to[Event] event.foreach(x => log.debug("Received event: {}.", x)) event.map(Right(_)) } else { Future.successful(data.utf8String) .map(Left(_)) } } 

The only requirement is that you know the maximum size of one record. If you start with something small, the default behavior fails if the record is over the limit. You can set it to truncate instead of failing, but a piece of JSON doesn't make sense.

0
source

Source: https://habr.com/ru/post/1234260/


All Articles