I have endless streams of elements that I want to group by id and aggregate groups, let them say 2 seconds and then send them downstream. Here is code that doesn't work, but can better explain what I want:
Source
.tick(0 second, 50 millis, () => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
.map { f => f() }
.groupBy(10, _._1)
// how to aggregate grouped elements here for two seconds?
.scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
.to(Sink.foreach(println))
And the desired result should look like this:
Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on
How can I achieve this functionality with threads?
source
share