RxJS: how to combine several nested observables with a buffer

Warning: RxJS newb here.

Here is my task:

  • When triggered by the observable onUnlink$...
  • Immediately start recording values ​​from the observed onAdd$maximum for 1 second (I will call this section onAddBuffer$).
  • Request a database (creating an doc$observable) to retrieve the model that we will use to match one of the valuesonAdd$
  • If one of the values ​​of the observed value onAddBuffer$corresponds to the value doc$, do not highlight
  • If none of the values ​​from the observed value onAddBuffer$matches the value doc$, or if the observed element onAddBuffer$never emits, emits the valuedoc$

This was my best guess:

// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {

  const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )

  const onAddBuffer$ = onAdd$
    .buffer( doc$ ) // capture events while fetching from db -- not sure about this
    .takeUntil( Rx.Observable.timer(1000) );

  // if there is a match, emit nothing. otherwise wait 1 second and emit doc
  return doc$.switchMap( doc =>
    Rx.Observable.race( 
      onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
      Rx.Observable.timer( 1000 ).mapTo( doc )
    )
  );
});

docsToRemove$.subscribe( doc => {
  // should only ever be invoked (with doc -- the doc$ value) 1 second
  // after `onUnlink$` emits, when there are no matching `onAdd$`
  // values within that 1 second window.
})

EmptyObservable. , , single , undefined, , , , ? find.

single filter, .

FYI: - 1 unlink add, , rename. unlink, , .

+4
1

, :

onUnlink$.concatMap(unlinkValue => {
  const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
  const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
  const onAddBuffer$ = onAdd$.buffer(bufferDuration$);

  return Observable.forkJoin(onAddBuffer$, doc$)
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});

single() , , , , Observable ( , ).

race() . Observables , race() . , , . https://github.com/ReactiveX/rxjs/issues/2641.
, , .

, .mapTo(Rx.Observable.empty()) Observable. , filter(() => false) ignoreElements().

+3
source

Source: https://habr.com/ru/post/1690972/


All Articles