Great question. The important point is that action$ is a hot / multicast stream of all actions as they are sent (this is the topic). Since it is hot, we can combine it several times, and they will all listen to the same stream of actions.
// uses switchMap so if another PAGINATION_CLICKED comes in // before FETCH_SUCCESS we start over action$ .ofType(PAGINATION_CLICKED) .switchMap(() => action$.ofType(FETCH_SUCCESS) .take(1) // <-------------------- very important! .map(() => analyticsAction()) .takeUntil(action$.ofType(FETCH_ERROR)) );
Therefore, every time we get PAGINATION_CLICKED , we start listening to this inner Observable chain, which listens to a single FETCH_SUCCESS . It is important to have this .take(1) , because otherwise we will continue to listen to more than one FETCH_SUCCESS , which may cause strange errors, and even if this is not the best practice, just take what you need.
We use takeUntil to cancel FETCH_SUCCESS waiting if we get FETCH_ERROR first.
As a bonus, if you decide you want to make some analytic material based on a mistake, not only get started, you can use race to really race between two streams. The first who emits wins; another unsubscribed.
action$ .ofType(PAGINATION_CLICKED) .switchMap(() => Observable.race( action$.ofType(FETCH_SUCCESS) .take(1) .map(() => analyticsAction()), action$.ofType(FETCH_ERROR) .take(1) .map(() => someOtherAnalyticsAction()) ) );
Itβs the same here, but using race as an instance operator instead of a static one. This is a stylistic preference that you can choose. They both do the same. Use what you understand more.
action$ .ofType(PAGINATION_CLICKED) .switchMap(() => action$.ofType(FETCH_SUCCESS) .map(() => analyticsAction()) .race( action$.ofType(FETCH_ERROR) .map(() => someOtherAnalyticsAction()) ) .take(1) );
source share