I built an aka chart that defines the flow. My goal is to reformat my future answer and save it in a file. The stream can be described below:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
val merger = builder.add(Merge[Future[Map[String, String]]](6))
val fileSink = FileIO.toPath(outputPath, options)
val ignoreSink = Sink.ignore
val in = Source(seeds)
in ~> balancer.in
for (i <- Range(0,6)) {
balancer.out(i) ~>
wikiFlow.async ~>
Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
merger
}
merger.out ~>
Flow[Future[Map[String, String]]].map((d) => {
}) ~>
fileSink
ClosedShape
})
I can hack this workflow to write my future map to a file via foreach, but I am afraid that this may lead to problems with w190> with FileIO, and this is simply not the case. What is the right way to handle futures with our akka stream?
source
share