RxJS: takeUntil with multiple actions and different filters?

I have an Observable that I want to continue execution to:

1) the uploadActions.MARK_UPLOAD_AS_COMPLETE action uploadActions.MARK_UPLOAD_AS_COMPLETE called with a certain payload

OR

2) the uploadActions.UPLOAD_FAILURE action uploadActions.UPLOAD_FAILURE called with any payload

This is as far as I can get (and not working):

 return Observable.interval(5000) .takeUntil( action$ .ofType( uploadActions.UPLOAD_FAILURE, uploadActions.MARK_UPLOAD_AS_COMPLETE ) .filter(a => { // <---- this filter only applies to uploadActions.MARK_UPLOAD_AS_COMPLETE const completedFileHandle = a.payload; return handle === completedFileHandle; }) ) .mergeMap(action => ... ); 

Is there a clean way I could achieve this?

0
source share
1 answer

I would split the two conditions into separate threads, and then combine them like this:

 const action$ = new Rx.Subject(); const uploadActions = { UPLOAD_FAILURE: "UPLOAD_FAILURE", MARK_UPLOAD_AS_COMPLETE: "MARK_UPLOAD_AS_COMPLETE" }; const handle = 42; window.setTimeout(() => action$.next({ type: uploadActions.MARK_UPLOAD_AS_COMPLETE, payload: handle }), 1200); Rx.Observable.interval(500) .takeUntil( Rx.Observable.merge( action$.filter(x => x.type === uploadActions.UPLOAD_FAILURE), action$.filter(x => x.type === uploadActions.MARK_UPLOAD_AS_COMPLETE) .filter(x => x.payload === handle) ) ).subscribe( x => { console.log('Next: ', x); }, e => { console.log('Error: ', e); }, () => { console.log('Completed'); } ); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script> 

For example, I had to use the filter operator instead of ofType , since ofType is a reduction.

+3
source

Source: https://habr.com/ru/post/1275480/


All Articles