How to group incoming events from an infinite stream?

I have an endless stream of events:

(timestamp, session_uid, traffic) 

i.e.

 ... (1448089943, session-1, 10) (1448089944, session-1, 20) (1448089945, session-2, 50) (1448089946, session-1, 30) (1448089947, session-2, 10) (1448089948, session-3, 10) ... 

I want to group these events by session_uid and calculate the amount of traffic for each session.

I wrote the akka-streams stream, which works fine when using the groupBy final stream (my code base for this example is from the cookbook). But it will not work with an infinite stream, because the groupBy function must process the entire incoming stream and only after that it will be ready to return the result.

I think that I should implement grouping with a timeout, that is, if I do not receive an event with the specified stream_uid for more than 5 minutes from the last, I should return grouped events for this session_uid. But how to implement it, use akka-streams only?

+5
source share
3 answers

I came up with a somewhat gnarly solution, but I think it is doing its job.

The essential idea is to use the keepAlive method of the source as a timer that will cause termination.

But for this, we first need to abstract the data a bit. The timer will have to send a trigger or other tuple value from the source, so:

 sealed trait Data object TimerTrigger extends Data case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data 

Then convert our source of tuples to a source of values. We will continue to use groupBy for groupings similar to your case with the final stream:

 val originalSource : Source[(Long, String, Int), Unit] = ??? type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid val groupedDataSource : Source[IDGroup, Unit] = originalSource.map(t => Value(t._1, t._2, t._3)) .groupBy(_.session_uid) 

The complex part handles groupings that are only tuples: (String, Source[Value,Unit]) . We need a timer to notify us if the time has passed, so we need another abstraction to know if we are still calculating, or we completed the calculation due to a timeout:

 sealed trait Sum { val sum : Int } case class StillComputing(val sum : Int) extends Sum case class ComputedSum(val sum : Int) extends Sum val zeroSum : Sum = StillComputing(0) 

Now we can merge the source of each group. keepAlive will send a TimerTrigger if the Value Source does not produce anything after timeOut . Data from keepAlive is then mapped to either the TimerTrigger template or the new value from the original Source:

 val evaluateSum : ((Sum , Data)) => Sum = { case (runningSum, data) => { data match { case TimerTrigger => ComputedSum(runningSum.sum) case v : Value => StillComputing(runningSum.sum + v.traffic) } } }//end val evaluateSum type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger) .scan(zeroSum)(evaluateSum) .collect {case c : ComputedSum => c.sum} .runWith(Sink.head) 

The collection is applied to a partial function that corresponds only to the final amount, so the sink is reached only after the timer has fired.

Then we apply this handler to each group that exits:

 val timeOut = FiniteDuration(5, MINUTES) val sumSource : Source[SumResult, Unit] = groupedDataSource map handleGroup(timeOut) 

Now we have the Source (String,Future[Int]) , which is session_uid and the Future traffic amount for this id.

As I said, it is confusing, but meets the requirements. Also, I'm not quite sure what will happen if a uid that has already been grouped and was disabled, but then a new value appears with the same uid.

+3
source

This is apparently used for Source.groupedWithin :

 def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat] 

"Divide this stream into groups of items received during the time window or limited to a given number of items, no matter what happens before."

Here is a link to the documents

+1
source

maybe you can just implement it with actor

 case class SessionCount(name: String) class Hello private() extends Actor { var sessionMap = Map[String, Int]() override def receive: Receive = { case (_, session: String, _) => sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1)) case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0) } } object Hello { private val actor = ActorSystem.apply().actorOf(Props(new Hello)) private implicit val timeOver = Timeout(10, TimeUnit.SECONDS) type Value = (String, String, String) def add(value: Value) = actor ! value def count(name:String) = (actor ? SessionCount(name )).mapTo[Int] } 
-1
source

Source: https://habr.com/ru/post/1236488/


All Articles