I came up with a solution using GraphStageWithMaterializedValue . This concept was borrowed from Play maxLength body parser . The key difference between my first attempt in my question (which does not compile) is that instead of trying to change the thread, I should use a materialized value to convey information about the processing status. While I created Flow[ByteString, Either[Result, ByteString], NotUsed] , it turned out that I needed Flow[ByteString, ByteString, Future[Boolean]] .
So my parser function is as follows:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header"))) val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) => statusFuture.flatMap { success => if (success) { resultFuture.map { case Left(result) => Left(result) case Right(a) => Right(a) } } else { Future.successful(Left(BadRequest)) } } }) }
Key line:
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
The rest of the views fall into place as soon as you can create this stream. Unfortunately, the BodyValidator rather verbose and feels somewhat boiler room. In any case, it is basically pretty easy to read. GraphStageWithMaterializedValue expects you to implement def shape: S ( S is FlowShape[ByteString, ByteString] here) to indicate the input type and output type of this graph. It also expects you to imlpement def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) ( M here Future[Boolean] ) to determine what the graph should do. Here's the full BodyValidator code (I'll explain in more detail below):
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] { val in = Inlet[ByteString]("BodyValidator.in") val out = Outlet[ByteString]("BodyValidator.out") override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = { val status = Promise[Boolean]() val bodyBuffer = new ByteStringBuilder() val logic = new GraphStageLogic(shape) { setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) }) setHandler(in, new InHandler { def onPush(): Unit = { val chunk = grab(in) bodyBuffer.append(chunk) push(out, chunk) } override def onUpstreamFinish(): Unit = { val fullBody = bodyBuffer.result() status.success(expected.map(ByteString(_)).contains(fullBody)) completeStage() } override def onUpstreamFailure(e: Throwable): Unit = { status.failure(e) failStage(e) } }) } (logic, status.future) } }
First you want to create an Inlet and Outlet to configure the inputs and outputs for your schedule.
val in = Inlet[ByteString]("BodyValidator.in") val out = Outlet[ByteString]("BodyValidator.out")
Then you use them to determine shape .
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
Inside createLogicAndMaterializedValue you need to initialize the value that you intend to use materialze. Here I used a promise that can be resolved when I have full data from the stream. I also create a ByteStringBuilder to track data between iterations.
val status = Promise[Boolean]() val bodyBuffer = new ByteStringBuilder()
Then I create GraphStageLogic to actually configure what this graph does at each processing point. Two handlers are installed. One of them is InHandler for processing data coming from the source. Another is OutHandler for processing data to send downstream. There is nothing really interesting about OutHandler , so I will ignore it here, in addition to saying that it is necessary for the boiler plate to avoid an IllegalStateException . Three methods are overridden in InHandler : onPush , onUpstreamFinish and onUpstreamFailure . onPush is called when new data is ready from the upstream. In this method, I just grab the next piece of data, write it to the bodyBuffer and push the data downstream.
def onPush(): Unit = { val chunk = grab(in) bodyBuffer.append(chunk) push(out, chunk) }
onUpstreamFinish is called when the top end finishes (unexpectedly). This is where the business logic of comparing the body with the heading occurs.
override def onUpstreamFinish(): Unit = { val fullBody = bodyBuffer.result() status.success(expected.map(ByteString(_)).contains(fullBody)) completeStage() }
onUpstreamFailure is implemented so that when something goes wrong, I can mark the materialized future as unsuccessful.
override def onUpstreamFailure(e: Throwable): Unit = { status.failure(e) failStage(e) }
Then I just returned the created GraphStageLogic and status.future as a tuple.