I am looking for a way to easily use akka-stream streams.
I am considering a thread that I intend to reuse as a function, so I would like to keep its signature, for example:
Flow[Input, Output, NotUsed]
Now, when I use this thread, I would like to be able to "call" this thread and save the result to the side for further processing.
So, I want to start with Flow emiting [Input]
, apply my stream and go to Flow emitting [(Input, Output)]
.
Example:
val s: Source[Int, NotUsed] = Source(1 to 10) val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) val via: Source[(Int, String), NotUsed] = ???
Now this is impossible in a straightforward way, because combining the stream with .via()
will give me a stream emitting only [Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
An alternative is that my reusable stream emits [(Input, Output)]
, but this requires that each stream enter its input through all the steps and make my code bad.
So, I came up with a combinator like this:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[In](2)) val zip = b.add(Zip[In, Out]) broadcast.out(0) ~> zip.in0 broadcast.out(1) ~> flow ~> zip.in1 FlowShape(broadcast.in, zip.out) })
}
which translates the input into the stream, as well as in a parallel line โ at the "Zip" stage, where I connect the values โโinto a tuple. Then it can be elegantly applied:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
Everything is fine, but when this thread performs the filter operation, this compiler gets stuck and stops processing further events.
I assume this is due to the "Zip" behavior, which requires that all the substreams do the same - in my case, one branch passes the given object directly, so the other substream cannot ignore this element. filter (), and since this happens, the thread stops because Zip is waiting for a click.
Is there a better way to achieve stream composition? Is there anything I can do in my tupledFlow to get the desired behavior when the "stream" ignores the elements using the "filter"?