I came up with a somewhat gnarly solution, but I think it is doing its job.
The essential idea is to use the keepAlive method of the source as a timer that will cause termination.
But for this, we first need to abstract the data a bit. The timer will have to send a trigger or other tuple value from the source, so:
sealed trait Data object TimerTrigger extends Data case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
Then convert our source of tuples to a source of values. We will continue to use groupBy for groupings similar to your case with the final stream:
val originalSource : Source[(Long, String, Int), Unit] = ??? type IDGroup = (String, Source[Value, Unit])
The complex part handles groupings that are only tuples: (String, Source[Value,Unit]) . We need a timer to notify us if the time has passed, so we need another abstraction to know if we are still calculating, or we completed the calculation due to a timeout:
sealed trait Sum { val sum : Int } case class StillComputing(val sum : Int) extends Sum case class ComputedSum(val sum : Int) extends Sum val zeroSum : Sum = StillComputing(0)
Now we can merge the source of each group. keepAlive will send a TimerTrigger if the Value Source does not produce anything after timeOut . Data from keepAlive is then mapped to either the TimerTrigger template or the new value from the original Source:
val evaluateSum : ((Sum , Data)) => Sum = { case (runningSum, data) => { data match { case TimerTrigger => ComputedSum(runningSum.sum) case v : Value => StillComputing(runningSum.sum + v.traffic) } } }
The collection is applied to a partial function that corresponds only to the final amount, so the sink is reached only after the timer has fired.
Then we apply this handler to each group that exits:
val timeOut = FiniteDuration(5, MINUTES) val sumSource : Source[SumResult, Unit] = groupedDataSource map handleGroup(timeOut)
Now we have the Source (String,Future[Int]) , which is session_uid and the Future traffic amount for this id.
As I said, it is confusing, but meets the requirements. Also, I'm not quite sure what will happen if a uid that has already been grouped and was disabled, but then a new value appears with the same uid.