An operator that skips the next emission from the source whenever another Observed

I have a use case when I need an Observable to skip its next radiation whenever another Observable notifier appears.

source:    |---X---X---X---X---X---X---X---X---X---X--|>
notifier:  |-------------X---------X----------X-------|>
result:    |---X---X---X-------X---X-------X-------X--|>

Basically, I need an operator with a name skipNextWhenthat takes the notifier of the observable and passes the next radiation from the source.

I tried using an implementation using an operator pausable( reimplemented with switchMap), but could not get it to work.

pausable.ts

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        pausable: typeof pausable;
    }
}

function pausable<T>(notifier: Observable<boolean>): Observable<T> {
    return notifier.startWith(false).switchMap((paused) => {
        if (paused) {
            return Observable.never();
        } else {
            const source = new Subject();
            this.subscribe(source);
            return source;
        }
    });
}

Observable.prototype.pausable = pausable;

skipNextWhen.ts

import { Observable } from 'rxjs/Observable';
import './pausable';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        skipNextWhen: typeof skipNextWhen;
    }
}

function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
    const notifier = Observable.merge(this.map(() => false), 
                                      other.map(() => true));
    return this.pausable(notifier);
}

Observable.prototype.skipNextWhen = skipNextWhen;

, ? , , , Observable , - Observable .

+4
1

:

  • .filter(), .do() .

    mayne , , "Rx":

    function skipNextWhen(other) {
        let skipNext = false;
    
        return this.merge(other.do(() => skipNext = true).filter(() => false))
            .filter(val => {
                const doSkip = skipNext;
                skipNext = false;
                return !doSkip;
            });
    }
    

    merge() skipNext, other .

  • .scan():

    - .

    function skipNextWhen(other) {
        const SKIP = 'skip';
    
        return this.merge(other.mapTo(SKIP))
            .scan((acc, val) => {
                if (acc === SKIP) {
                    return null;
                } else if (val === SKIP) {
                    return SKIP;
                } else {
                    return val;
                }
            }, [])
            .filter(val => Boolean(val) && val !== SKIP);
    }
    

    , SKIP, , acc scan(), filter().

    , SKIP, null, .

:

Observable.prototype.skipNextWhen = skipNextWhen;

const source = Observable.range(1, 10)
    .concatMap(val => Observable.of(val).delay(100));

source
    .skipNextWhen(Observable.interval(350))
    .subscribe(console.log);

:

1
2
3
5
6
8
9
10

, . . , , other, .

+3

Source: https://habr.com/ru/post/1680642/


All Articles