An elegant way to reuse akka-stream streams

I am looking for a way to easily use akka-stream streams.

I am considering a thread that I intend to reuse as a function, so I would like to keep its signature, for example:

Flow[Input, Output, NotUsed]

Now, when I use this thread, I would like to be able to "call" this thread and save the result to the side for further processing.

So, I want to start with Flow emiting [Input] , apply my stream and go to Flow emitting [(Input, Output)] .

Example:

 val s: Source[Int, NotUsed] = Source(1 to 10) val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) val via: Source[(Int, String), NotUsed] = ??? 

Now this is impossible in a straightforward way, because combining the stream with .via() will give me a stream emitting only [Output]

 val via: Source[String, NotUsed] = s.via(stringIfEven) 

An alternative is that my reusable stream emits [(Input, Output)] , but this requires that each stream enter its input through all the steps and make my code bad.

So, I came up with a combinator like this:

 def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[In](2)) val zip = b.add(Zip[In, Out]) broadcast.out(0) ~> zip.in0 broadcast.out(1) ~> flow ~> zip.in1 FlowShape(broadcast.in, zip.out) }) 

}

which translates the input into the stream, as well as in a parallel line โ†’ at the "Zip" stage, where I connect the values โ€‹โ€‹into a tuple. Then it can be elegantly applied:

 val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven)) 

Everything is fine, but when this thread performs the filter operation, this compiler gets stuck and stops processing further events.

I assume this is due to the "Zip" behavior, which requires that all the substreams do the same - in my case, one branch passes the given object directly, so the other substream cannot ignore this element. filter (), and since this happens, the thread stops because Zip is waiting for a click.

Is there a better way to achieve stream composition? Is there anything I can do in my tupledFlow to get the desired behavior when the "stream" ignores the elements using the "filter"?

+6
source share
2 answers

Two possible approaches - with debatable elegance - are:

1) Avoid using the filtering steps by mutating your filter into Flow[Int, Option[Int], NotUsed] . Thus, you can apply your wrapping tape around the entire chart, just like your original plan. However, the code looks more messed up, and overhead is added, passing around None s.

 val stringIfEvenOrNone = Flow[Int].map{ case x if x % 2 == 0 => Some(x.toString) case _ => None } val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{ case (num, Some(str)) => (num,str) } 

2) Separate the stages of filtration and transformation and apply filter elements before your wrapper. Probably an easier and better compromise.

 val filterEven = Flow[Int].filter(_ % 2 == 0) val toString = Flow[Int].map(_.toString) val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString)) 

EDIT

3) Publishing another solution here for clarity, as discussed in the comments.

This stream wrapper allows you to emit each element from a given stream in combination with the original input element that generated it. It works for any internal stream (emitting 0, 1 or more elements for each input).

  def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] = Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out)) 
+3
source

I came up with a TupledFlow implementation that works when wrapped Flow uses filter() or mapAsync() , and when wrapped Flow emits 0.1 or N elements for each input:

  def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = { val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In => val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq) val bothFuture: Future[Seq[(In,Out)]] = outFuture.map( seqOfOut => seqOfOut.map((in,_)) ) bothFuture } val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable]) onlyDefined } 

The only drawback that I see here is that I instantiate and materialize the stream for one object - just to get the concept of โ€œcalling the stream as a functionโ€.

I have not conducted any performance tests, however, since the hard work is done in the wrapped Flow , which will be done in the future, I believe that this will be normal.

This implementation passes all tests from https://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala

+1
source

Source: https://habr.com/ru/post/1013579/


All Articles