I have a streaming graph with broadcast and mail inside. If something (no matter what it is) fails inside this thread, I would like to leave the problematic element passed to it and resume it. I came up with the following solution:
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))
I expect it to print:
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
However, it is blocked, only print:
(1,1)
(2,2)
(3,3)
(4,4)
We did some debugging, and it turns out that she applied the βresumeβ strategy to her children, which caused a dangerousFlow
resumption after the failure and, therefore, require the element from bcast
. bcast
will not select an element until safeFlow
it requires another element, which in fact will never happen (because it expects a request from zip
).
, ?