Prevention of multiple executions of observables

I have a screen that should present some constantly changing data from api. Regarding the observed api call of mine, I use repeatWhen () to do the polling. In addition, when I detect certain changes in the values โ€‹โ€‹of the input data, I need to make an additional api call. My current implementation works as follows:

    let queueStatusObservable = this.enumService.getOptions('WorkQueueStatus');
    let queueItemObservable = this.enumService.getOptions('WorkItemStatus');

    // retrieve status every 3000ms
    let summaryObservable = this.service
        .getSummary(this.id)
        .repeatWhen(completed => completed.delay(3000));

    this.summaryModel$ = Observable.combineLatest(queueStatusObservable, queueItemObservable, summaryObservable)
        .map(results => {
            let workQueueStatusList = results[0];
            let workItemStatusList = results[1];
            let summary = results[2].status > -1 ? results[2] : null // convert empty {} object to null;

            let model = {
                workQueueStatusList: workQueueStatusList,
                workItemStatusList: workItemStatusList,
                summary: summary
            };

            return model;
        });

    // fetch validationmessages every time the count changes
    this.validationMessages$ = Observable.merge(
        summaryObservable.first((value, index) => value.statusCount['Invalid'] > 0),
        summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Invalid'] != value[1].statusCount['Invalid'])
    ).flatMap(i => me.service.getMessages());

    // fetch errors every time the count changes
    this.errorMessages$ = Observable.merge(
        summaryObservable.first((value, index) => value.statusCount['Error'] > 0),
        summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Error'] != value[1].statusCount['Error'])
    ).flatMap(i => me.service.getErrors());

There are no subscribe () calls for observables, as they occur in my angular patterns using an async channel, for example:

    <div *ngIf="summaryModel$|async; let summaryModel">

I expected that I would get one api call every 3 seconds and that all statements working on summaryObservable would just be triggered by this response to the api call.

, . chrome, , 4 api 3 . , rxjs, rxjs ?

, :

let summaryObservable = this.service
    .getSummary(this.id)
    .repeatWhen(completed => completed.delay(3000))
    .shareReplay();

, / , shareReplay , .

+4
1

summaryObservable 5 . 2 - first. 3 .

, - publish " " .

- :

let summaryObservable = this.service
  .getSummary(this.id)
  .repeatWhen(completed => completed.delay(3000))
  .publish(); // creates a ConnectableObservable

// ...
// all the rest of your code goes here
// ...

// Finally "connect" the observable to start the polling
// that will be shared by all the other usages
this.summaryConnection = summaryObservable.connect()

, ngOnDestroy, , :

ngOnDestroy() {
    this.summaryConnection.unsubscribe(); // .dispose() if using older version of RXJS
}
+2

Source: https://habr.com/ru/post/1685083/


All Articles