Can groupBy substreams depend on the keys from which they were created?

I have a stream with data related to users. I also have a state for each user that I can get asynchronously from the database.

I want to split a stream with one sub-stream per user and load the state for each user when materializing the sub-stream so that the elements of the sub-stream can be processed relative to this state.

If I do not want to combine the sub-flows downstream, I can do something with groupByand Sink.lazyInit:

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

However, this does not work if treatUserit becomes Flow, because for Sink.lazyInitthere is no equivalent.

groupBy , , , groupBy, , , Sink.lazyInit, -, Flow.

, ?

+6
1

Akka, , - # 20129: Sink.dynamic Flow.dynamic.

PR # 20579 LazySink stuffs.

LazyFlow :

lazyFlow .

, , Akka ( PR Akka).

+2

Source: https://habr.com/ru/post/1015844/


All Articles