In a comment from Victor Clan on a related Gist. This proves to be a great solution:
def consume(
errorHandler: BadData => Unit, fn: Data => Unit, a: String
): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
import GraphDSL.Implicits._
val source = b.add(Source.single(a))
val broadcast = b.add(Broadcast[String](2))
val merge = b.add(Zip[String, String])
val process = new ProcessorFlow(fn)
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
(input: Xor[BadData, Data]) =>
input.swap.getOrElse((new Throwable, ("", "")))
))
source ~> broadcast.in
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
merge.out ~> process ~> failed ~> errors ~> sink
ClosedShape
}
)
This allows me Await.resultto runnablegraph for testing purposes. Thanks again to Victor for this decision!
source
share