Alternative streams based on akka stream condition

Have a stream with custom threads, and at some point I want to split the stream and have two alternative data processing that will merge again later.

eg.

-> F3 -> F6 Src -> F1 -> F2 > Merge -> Sink -> F4 -> F5 

F2 must have a condition, if the data contains format A , then it must go to stream F3 , otherwise go to F4 .

As far as I see, each thread can have only one port in each direction (or two if bidi) - so how can I support such a thread?

+5
source share
1 answer

You can use Broadcast to split the stream, then you can use filter or collect for each of the streams to filter the necessary data.

 val split = builder.add(Broadcast[Int](2)) Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink -> filterCondB -> F4 -> F5 -> Merge 

In addition, there is the Partition stage, which processes the number of output ports and the card function from the value to the port number f: T => Int .

 val portMapper(value: T): Int = value match { case CondA => 0 case CondB => 1 } val split = builder.add(Partition[T](2, portMapper)) Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink split -> F4 -> F5 -> Merge 

Perhaps there is an easier way.

+12
source

Source: https://habr.com/ru/post/1247319/


All Articles