You can use Broadcast to split the stream, then you can use filter or collect for each of the streams to filter the necessary data.
val split = builder.add(Broadcast[Int](2)) Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink -> filterCondB -> F4 -> F5 -> Merge
In addition, there is the Partition stage, which processes the number of output ports and the card function from the value to the port number f: T => Int .
val portMapper(value: T): Int = value match { case CondA => 0 case CondB => 1 } val split = builder.add(Partition[T](2, portMapper)) Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink split -> F4 -> F5 -> Merge
Perhaps there is an easier way.
source share