BodyParser compilation in game 2.5

For a function with this signature:

def parser[A](otherParser: BodyParser[A]): BodyParser[A] 

How can I write a function so that the request body is validated and verified before it is passed to otherParser ?

For simplicity, let’s say that I want to check that the title ("Some-Header", possibly) has a value that exactly matches the body. Therefore, if I have this action:

 def post(): Action(parser(parse.tolerantText)) { request => Ok(request.body) } 

When I make a request like curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post , it should return a "hi" to the response body with a status of 200. If my request is curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post , it should return 400 without a body.

Here is what I have tried.

This compiler does not compile because otherParser(request).through(flow) expects flow to output a ByteString . The idea here was that the thread could notify the battery whether to continue processing using the Either output. I'm not sure how to tell the battery the status of the previous step.

 def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes => if (request.headers.get("Some-Header").contains(bytes.utf8String)) { Right(bytes) } else { Left(BadRequest) } } val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request) // This fails to compile because flow needs to output a ByteString acc.through(flow) } 

I also tried using a filter. This compiles, and the response body that is written is correct. However, it always returns a response status of 200 Ok .

 def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes => request.headers.get("Some-Header").contains(bytes.utf8String) } val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request) acc.through(flow) } 
+5
source share
1 answer

I came up with a solution using GraphStageWithMaterializedValue . This concept was borrowed from Play maxLength body parser . The key difference between my first attempt in my question (which does not compile) is that instead of trying to change the thread, I should use a materialized value to convey information about the processing status. While I created Flow[ByteString, Either[Result, ByteString], NotUsed] , it turned out that I needed Flow[ByteString, ByteString, Future[Boolean]] .

So my parser function is as follows:

 def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header"))) val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) => statusFuture.flatMap { success => if (success) { resultFuture.map { case Left(result) => Left(result) case Right(a) => Right(a) } } else { Future.successful(Left(BadRequest)) } } }) } 

Key line:

 val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header"))) 

The rest of the views fall into place as soon as you can create this stream. Unfortunately, the BodyValidator rather verbose and feels somewhat boiler room. In any case, it is basically pretty easy to read. GraphStageWithMaterializedValue expects you to implement def shape: S ( S is FlowShape[ByteString, ByteString] here) to indicate the input type and output type of this graph. It also expects you to imlpement def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) ( M here Future[Boolean] ) to determine what the graph should do. Here's the full BodyValidator code (I'll explain in more detail below):

 class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] { val in = Inlet[ByteString]("BodyValidator.in") val out = Outlet[ByteString]("BodyValidator.out") override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = { val status = Promise[Boolean]() val bodyBuffer = new ByteStringBuilder() val logic = new GraphStageLogic(shape) { setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) }) setHandler(in, new InHandler { def onPush(): Unit = { val chunk = grab(in) bodyBuffer.append(chunk) push(out, chunk) } override def onUpstreamFinish(): Unit = { val fullBody = bodyBuffer.result() status.success(expected.map(ByteString(_)).contains(fullBody)) completeStage() } override def onUpstreamFailure(e: Throwable): Unit = { status.failure(e) failStage(e) } }) } (logic, status.future) } } 

First you want to create an Inlet and Outlet to configure the inputs and outputs for your schedule.

 val in = Inlet[ByteString]("BodyValidator.in") val out = Outlet[ByteString]("BodyValidator.out") 

Then you use them to determine shape .

 def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) 

Inside createLogicAndMaterializedValue you need to initialize the value that you intend to use materialze. Here I used a promise that can be resolved when I have full data from the stream. I also create a ByteStringBuilder to track data between iterations.

 val status = Promise[Boolean]() val bodyBuffer = new ByteStringBuilder() 

Then I create GraphStageLogic to actually configure what this graph does at each processing point. Two handlers are installed. One of them is InHandler for processing data coming from the source. Another is OutHandler for processing data to send downstream. There is nothing really interesting about OutHandler , so I will ignore it here, in addition to saying that it is necessary for the boiler plate to avoid an IllegalStateException . Three methods are overridden in InHandler : onPush , onUpstreamFinish and onUpstreamFailure . onPush is called when new data is ready from the upstream. In this method, I just grab the next piece of data, write it to the bodyBuffer and push the data downstream.

 def onPush(): Unit = { val chunk = grab(in) bodyBuffer.append(chunk) push(out, chunk) } 

onUpstreamFinish is called when the top end finishes (unexpectedly). This is where the business logic of comparing the body with the heading occurs.

 override def onUpstreamFinish(): Unit = { val fullBody = bodyBuffer.result() status.success(expected.map(ByteString(_)).contains(fullBody)) completeStage() } 

onUpstreamFailure is implemented so that when something goes wrong, I can mark the materialized future as unsuccessful.

 override def onUpstreamFailure(e: Throwable): Unit = { status.failure(e) failStage(e) } 

Then I just returned the created GraphStageLogic and status.future as a tuple.

+3
source

Source: https://habr.com/ru/post/1263930/


All Articles