Why exceptions do not stop Akka thread flow

I have a thread that depends on API responses. When the answers do not match what I expect, an exception is thrown. This strategy works well for Spray and for direct testing of methods with specs2.

However, when I try to use threads with exception modules, the thread just stops.

This is my thread:

 Source(() => file)
      .via(csvToSeq)
      .via(getFromElastic)
      .via(futureExtrtactor)
      .via(findLocaionOfId)
      .foreach(v => v.map(v => println("foreached", v)))
      .onComplete(_ => system.shutdown())

My strategy for this is to use mapfor futures.

So:

 val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
      jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
        js.asJsObject("Couldn't convert").getFields("externalId").map({
          case JsString(str) => {
              (i + 1, i == 0, js)
            }
            else (i, false, js)
          }
          case _ => (i, false, x)
        })
      })
      }
    }))

This is a potential exception thrower in a completely different place:

val encoded_url = URLEncoder.encode(url, "UTF-8")

It seems like I'm missing something, but I don’t see that. Thanks for any pointers.

+4
source share
2 answers

, Akka. Akka - "-", , , , .

// 1.0-M2 ( ).

+3

, , mapAsync.

:

val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].mapAsync({...})

, , , .

+3

Source: https://habr.com/ru/post/1570477/


All Articles