How do you feel about futures in Akka Flow?

I built an aka chart that defines the flow. My goal is to reformat my future answer and save it in a file. The stream can be described below:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._
      val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
      val merger = builder.add(Merge[Future[Map[String, String]]](6))
      val fileSink = FileIO.toPath(outputPath, options)
      val ignoreSink = Sink.ignore
      val in = Source(seeds)
      in ~> balancer.in
      for (i <- Range(0,6)) {
        balancer.out(i) ~>
          wikiFlow.async ~>
          // This maps to a Future[Map[String, String]]
          Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
          merger
      }

      merger.out ~>
      // When we merge we need to map our Map to a file
      Flow[Future[Map[String, String]]].map((d) => {
        // What is the proper way of serializing future map
        // so I can work with it like a normal stream into fileSink?

        // I could manually do ->
        // d.foreach(someWriteToFileProcess(_))
        // with ignoreSink, but this defeats the nice
        // akka flow
      }) ~>
      fileSink

      ClosedShape
    })

I can hack this workflow to write my future map to a file via foreach, but I am afraid that this may lead to problems with w190> with FileIO, and this is simply not the case. What is the right way to handle futures with our akka stream?

+4
source share
1 answer

The easiest way to create Flowone that involves asynchronous computing is to use mapAsync.

... , Flow, Int String, mapper: Int => Future[String] parallelism 5.

val mapper: Int => Future[String] = (i: Int) => Future(i.toString)

val yourFlow = Flow[Int].mapAsync[String](5)(mapper)

, .

,

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val intSource = Source(1 to 10)

  val printSink = Sink.foreach[String](s => println(s))

  val yourMapper: Int => Future[String] = (i: Int) => Future(i.toString)

  val yourFlow = Flow[Int].mapAsync[String](2)(yourMapper)

  intSource ~> yourFlow ~> printSink

  ClosedShape
}
+9

Source: https://habr.com/ru/post/1668896/


All Articles