Rx distinctUntilChanged allows repeat after custom time between events

Consider for a moment the following code

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
  .subscribe(x => console.log(x))

We expect to 1register only once. However, what if we want to allow the repetition of a value if its last emission was a configurable amount of time ago? I mean get both events in the log .

For example, it would be great to have something like the following

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
  .subscribe(x => console.log(x))

Which distinctUntilChanged()takes some timeout to allow repetition on the next element. However, this does not exist, and I was wondering if anyone knows an elegant way to achieve this using high-level operators, without going down with a filter that requires state processing

+6
2

, , windowTime:

Observable
  .merge(
   Observable.of(1),
   Observable.of(1).delay(250), // Ignored
   Observable.of(1).delay(700), // Ignored
   Observable.of(1).delay(2000),
   Observable.of(1).delay(2200), //Ignored
   Observable.of(2).delay(2300)
  )
  // Converts the stream into a stream of streams each 1000 milliseconds long
  .windowTime(1000)
  // Flatten each of the streams and emit only the latest (there should only be one active 
  // at a time anyway
  // We apply the distinctUntilChanged to the windows before flattening
  .switchMap(source => source.distinctUntilChanged())  
  .timeInterval()
  .subscribe(
    value => console.log(value),
    error => console.log('error: ' + error),
    () => console.log('complete')
  );

. ( @martin)

+8

. , , ( , RxJS 5):

let timedDistinctUntil = Observable.defer(() => {
    let innerObs = null;
    let innerSubject = null;
    let delaySub = null;

    function tearDown() {
        delaySub.unsubscribe();
        innerSubject.complete();
    }

    return Observable
        .merge(
            Observable.of(1),
            Observable.of(1).delay(250),  // ignored
            Observable.of(1).delay(700),  // ignored
            Observable.of(1).delay(2000),
            Observable.of(1).delay(2200), // ignored
            Observable.of(2).delay(2300)
        )
        .do(undefined, undefined, () => tearDown())
        .map(value => {
            if (innerObs) {
                innerSubject.next(value);
                return null;
            }

            innerSubject = new BehaviorSubject(value);

            delaySub = Observable.of(null).delay(1000).subscribe(() => {
                innerObs = null;
            });

            innerObs = innerSubject.distinctUntilChanged();
            return innerObs;
        })
        // filter out all skipped Observable emissions
        .filter(observable => observable)
        .switch();
});

timedDistinctUntil
    .timestamp()
    .subscribe(
        value => console.log(value),
        error => console.log('error: ' + error),
        () => console.log('complete')
    );

-: https://jsbin.com/sivuxo/5/edit?js,console

Observable.defer(), .

, :

  • merge() .

  • do() , , .

  • map() - , . , , null, Observable ( 1000 = innerObs != null). , BehaviorSubject .distinctUntilChanged(). 1 innerObs = null, , , , Observable .distinctUntilChanged().

  • filter() null. , .

  • (Observables, Observables). switch(), Observable, . . ( filter()), , , distinctUntilChanged(), .

:

Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete

, 1 cca 2s. 2 - 100 distinctUntilChanged().

, , , :)

+2

Source: https://habr.com/ru/post/1014152/


All Articles