Using RxJS switchMap to unsubscribe only to streams with the same request URL / payload (reduction observable epics)

I have an interface where the user can initiate calls to the same endpoint, but with different parameters (in this case, UUID). So far, I have enjoyed the switchMap behavior switchMap canceling my in-flight HTTP requests whenever I submit a new redux action of the same type, in which case I still want this behavior, but only if a new UUID action is requested (part action object) matches the one that is already running. I am not entirely sure about the right approach to this.

For example, after sending several actions at once, I hope that all of them with unique identifiers will be completed, but those that repeat the existing and not yet completed identifier will cancel the previous request and take its place.

Example:

 store.dispatch({ type: "GET_SOME_DATA", uuid: "1" }) store.dispatch({ type: "GET_SOME_DATA", uuid: "2" }) store.dispatch({ type: "GET_SOME_DATA", uuid: "2" }) store.dispatch({ type: "GET_SOME_DATA", uuid: "3" }) store.dispatch({ type: "GET_SOME_DATA", uuid: "2" }) // Get results back for '1', then '3', then '2' assuming equal response times. // Only the duplicate uuid calls were cancelled, even though all have the same 'type' 

I tried using .distinctUntilChanged((a, b) => a.uuid === b.uuid) to filter the input stream in .switchMap to see what happens, but it just limits what actions reach switchMap and undo behavior all but the last GET_SOME_DATA API call associated with the actions is still.

 const getDataEpic = (action$) => action$.ofType(GET_SOME_DATA) .switchMap(({ uuid }) => // would be great if the switchMap would only cancel existing streams with same uuid ajax.getJSON(`/api/datastuff/${uuid}`) .map((data) => successAction(uuid, data.values)) .catch((err) => Observable.of( errorAction(uuid), setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'), )) 

I am currently using mergeMap , but I am worried that this can cause problems, for example, which I encountered in real time, when an older request can be resolved after the very last one, as a result of which my redux store is updated with old data, since mergeMap does not cancel Observable streams such as switchMap .... is there a way to look at current RjJS Ajax requests and undo those that have a new action url, or a better solution that I am clearly missing?

Hooray!

Edit: I wonder if changing the value of switchMap to mergeMap , then binding takeUntil and takeUntil on other GET_SOME_DATA actions would be the right approach, or if it just leads to the immediate cancellation of all requests? for instance

 const getDataEpic = (action$) => action$.ofType(GET_SOME_DATA) .mergeMap(({ uuid }) => ajax.getJSON(`/api/datastuff/${uuid}`) .takeUntil( action$.ofType(GET_SOME_DATA).filter(laterAction => laterAction.uuid === uuid) ) .map((data) => successAction(uuid, data.values)) .catch((err) => Observable.of( errorAction(uuid), setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'), )) 

Edit2: Apparently adding takeUntil works! I'm not sure if this is 100% higher and higher in terms of being suitable, but I would like some feedback. I would also like to support the manual cancellation option, will the method discussed here be the proper version?

Edit3: I think this is my latest working version. Removed the destructuring of the Redux action in the mergeMap file just in case someone newer steps on it to reduce the observables:

 const getDataEpic = (action$) => action$.ofType(GET_SOME_DATA) .mergeMap((action) => ajax.getJSON(`/api/datastuff/${action.uuid}`) .takeUntil(Observable.merge( action$.ofType(MANUALLY_CANCEL_GETTING_DATA) .filter((cancelAction) => cancelAction.uuid === action.uuid), action$.ofType(GET_SOME_DATA) .filter((laterAction) => laterAction.uuid === action.uuid), )) .map((data) => successAction(action.uuid, data.values)) .catch((err) => Observable.of( errorAction(action.uuid), setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'), )) 

And the observed behavior of the network from a quick click on everything in sight. Only non-duplicate requests for identifiers have passed!

enter image description here

+5
source share
1 answer

You can also use the groupBy operator to process threads using the same uuid and apply the useful switchMap behavior for each uuid action thread:

 action$.ofType(GET_SOME_DATA) .groupBy( ({ uuid }) => uuid, // group all the actions by uuid x => x, group$ => group$.switchMap(_ => Observable.timer(5000)) // close existing streams if no event of a grouped action is emitted 5 seconds in a row (prevents memory leaks) ) .mergeMap(actionsGroupedByUuid$ => actionsGroupedByUuid$.switchMap(({ uuid }) => ajax.getJSON(`/api/datastuff/${uuid}`) .map((data) => successAction(uuid, data.values)) .catch((err) => Observable.of( errorAction(uuid), setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'), )) ) ); 
+2
source

Source: https://habr.com/ru/post/1275478/


All Articles