I am looking for a way to easily use akka-stream streams.
I am considering a thread that I intend to reuse as a function, so I would like to keep its signature, for example:
Flow[Input, Output, NotUsed]
Now, when I use this thread, I would like to be able to "call" this thread and save the result to the side for further processing.
So, I want to start with Flow emiting [Input] , apply my stream and go to Flow emitting [(Input, Output)] .
Example:
 val s: Source[Int, NotUsed] = Source(1 to 10) val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) val via: Source[(Int, String), NotUsed] = ??? 
Now this is impossible in a straightforward way, because combining the stream with .via() will give me a stream emitting only [Output]
 val via: Source[String, NotUsed] = s.via(stringIfEven) 
An alternative is that my reusable stream emits [(Input, Output)] , but this requires that each stream enter its input through all the steps and make my code bad.
So, I came up with a combinator like this:
 def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[In](2)) val zip = b.add(Zip[In, Out]) broadcast.out(0) ~> zip.in0 broadcast.out(1) ~> flow ~> zip.in1 FlowShape(broadcast.in, zip.out) }) 
}
which translates the input into the stream, as well as in a parallel line โ at the "Zip" stage, where I connect the values โโinto a tuple. Then it can be elegantly applied:
 val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven)) 
Everything is fine, but when this thread performs the filter operation, this compiler gets stuck and stops processing further events.
I assume this is due to the "Zip" behavior, which requires that all the substreams do the same - in my case, one branch passes the given object directly, so the other substream cannot ignore this element. filter (), and since this happens, the thread stops because Zip is waiting for a click.
Is there a better way to achieve stream composition? Is there anything I can do in my tupledFlow to get the desired behavior when the "stream" ignores the elements using the "filter"?