Akka Streams splits a stream by type

I have the following simple class hierarchy:

sealed trait Message case class Foo(bar: Int) extends Message case class Baz(qux: String) extends Message 

And I have Flow[Message, Message, NotUsed] (from a protocol based on Websocket with a codec already installed).

I want to demultiplex this Flow[Message] into separate threads for the Foo and Baz types, since they are processed in completely different ways.

What is the easiest way to do this? It should be obvious, but I'm missing something ...

+6
source share
1 answer

One way is to create a RunnableGraph that includes threads for each type of message.

 val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => val in = Source(...) // Some message source val out = Sink.ignore val foo = builder.add(Flow[Message].map (x => x match { case f@Foo (_) => f })) val baz = builder.add(Flow[Message].map (x => x match { case b@Baz (_) => b })) val partition = builder.add(Partition[Message](2, { case Foo(_) => 0 case Baz(_) => 1 })) partition ~> foo ~> // other Flow[Foo] here ~> out partition ~> baz ~> // other Flow[Baz] here ~> out ClosedShape } g.run() 
+5
source

Source: https://habr.com/ru/post/1012051/


All Articles