Akka streams - resuming schedule with broadcast and mail after a failure

I have a streaming graph with broadcast and mail inside. If something (no matter what it is) fails inside this thread, I would like to leave the problematic element passed to it and resume it. I came up with the following solution:

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val dangerousFlow = Flow[Int].map {
    case 5 => throw new RuntimeException("BOOM!")
    case x => x
  }
  val safeFlow = Flow[Int]
  val bcast = builder.add(Broadcast[Int](2))
  val zip = builder.add(Zip[Int, Int])

  bcast ~> dangerousFlow ~> zip.in0
  bcast ~> safeFlow ~> zip.in1

  FlowShape(bcast.in, zip.out)
})

Source(1 to 9)
  .via(flow)
  .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.foreach(println))

I expect it to print:

(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)

However, it is blocked, only print:

(1,1)
(2,2)
(3,3)
(4,4)

We did some debugging, and it turns out that she applied the β€œresume” strategy to her children, which caused a dangerousFlowresumption after the failure and, therefore, require the element from bcast. bcastwill not select an element until safeFlowit requires another element, which in fact will never happen (because it expects a request from zip).

, ?

+4
1

, . , , 5 dangerousFlow, 5, safeFlow, , zip, , . , broadcast zip, , ?

dangerousFlow:

import scala.util._
val dangerousFlow = Flow[Int].map {
  case 5 => Failure(new RuntimeException("BOOM!"))
  case x => Success(x)
}

dangerousFlow . zip, , collect . :

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
}

, , , , (5, 5), :

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
  case (_, i)          => i -> i
}
+3

Source: https://habr.com/ru/post/1688447/


All Articles