. , , ( , RxJS 5):
let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.complete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // ignored
Observable.of(1).delay(700), // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
-: https://jsbin.com/sivuxo/5/edit?js,console
Observable.defer()
, .
, :
merge()
.
do()
, , .
map()
- , . , , null
, Observable ( 1000 = innerObs != null
). , BehaviorSubject
.distinctUntilChanged()
. 1 innerObs = null
, , , , Observable .distinctUntilChanged()
.
filter()
null
. , .
(Observables, Observables). switch()
, Observable, . . ( filter()
), , , distinctUntilChanged()
, .
:
Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete
, 1
cca 2s. 2
- 100 distinctUntilChanged()
.
, , , :)