I have a stream with data related to users. I also have a state for each user that I can get asynchronously from the database.
I want to split a stream with one sub-stream per user and load the state for each user when materializing the sub-stream so that the elements of the sub-stream can be processed relative to this state.
If I do not want to combine the sub-flows downstream, I can do something with groupBy
and Sink.lazyInit
:
def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...
val treatByUser: Sink[Element] = Flow[Element].groupBy(
Int.MaxValue,
getUserId
).to(
Sink.lazyInit(
elt => getState(getUserId(elt)).map(treatUser),
??? // this is never called, since the subflow is created when an element comes
)
)
However, this does not work if treatUser
it becomes Flow
, because for Sink.lazyInit
there is no equivalent.
groupBy
, , , groupBy, , , Sink.lazyInit
, -, Flow
.
, ?