How to control Akka thread flow based on another thread

Let's say that I have two sources:

val ticks = Source(1 to 10) val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable]) 

I would like to create a Graph[...] processing step in the Akka thread, which is based on the current value of the ticks streams, which it consumes as much as possible in the value stream. So, for example, when the values โ€‹โ€‹match, I want to return all the elements that match in the second source, otherwise save the tick, which will lead to the conclusion, for example:

 (1, None) (2, None) (3, Some(Seq(3))) (4, Some(Seq(4, 4))) (5, None) (6, None) (7, Some(Seq(7))) (8, Some(Seq(8,8,8,8))) (9, Some(Seq(9))) (10, None) 

How would you implement this behavior?

+6
source share
1 answer

I would recommend you take a look at the Akka Stream documentation on this subject: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

According to the site, you can implement GraphStage as follows:

 final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] { val in = Inlet[E]("AccumulateWhileUnchanged.in") val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out") override def shape = FlowShape(in, out) } 

There is also a blog post on this subject: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

Hope this helps :)

+1
source

Source: https://habr.com/ru/post/1012592/


All Articles