RxJS refCounting. refCount .
refCount , . , , 0, .
- () . , , , , , (refCount 0 1). , , . , refCount 0, .
:
const {
Observable
} = Rx;
let sourceObservable = Observable
.create((observer) => {
let count = 0;
let interval = setInterval(() => {
observer.next(count++)
}, 700);
setTimeout(() => {
clearInterval(interval);
observer.complete();
}, 5500);
return () => {
clearInterval(interval);
console.log('######## Source observable unsubscribed');
}
})
.do((x) => console.log('#### Source emits: ' + x));
let subject = sourceObservable
.share()
;
let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;
setTimeout(() => {
console.log('pageOneObserver will subscribe');
pageOneObserver = subject.subscribe({
next: (x) => {
console.log('pageOneObserver gets: ' + x);
},
complete: () => {
console.log('pageOneObserver: complete');
}
});
}, 1000);
setTimeout(() => {
console.log('pageTwoObserver will subscribe');
pageTwoObserver = subject.subscribe({
next: (x) => {
console.log('pageTwoObserver gets: ' + x);
},
complete: () => {
console.log('pageTwoObserver: complete');
}
});
}, 4000);
setTimeout(() => {
console.log('pageOneObserver will unsubscribe');
pageOneObserver.unsubscribe();
}, 7000);
setTimeout(() => {
console.log('pageTwoObserver will unsubscribe');
pageTwoObserver.unsubscribe();
}, 10000);
setTimeout(() => {
console.log('pageThreeObserver will subscribe');
pageThreeObserver = subject.subscribe({
next: (x) => {
console.log('pageThreeObserver gets: ' + x);
},
complete: () => {
console.log('pageThreeObserver: complete');
}
});
}, 13000);
setTimeout(() => {
console.log('pageThreeObserver will unsubscribe');
pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
Hide result. :
sourceObservable.share();
sourceObservable.publish().refCount();
sourceObservable.publish().refCount();
sourceObservable.multicast(new Rx.Subject()).refCount();
sourceObservable.publishReplay().refCount();
sourceObservable.multicast(new Rx.ReplaySubject(1)).refCount();
sourceObservable.publishBehavior().refCount();
sourceObservable.multicast(new Rx.BehaviorSubject(0)).refCount();
sourceObservable.publishLast().refCount();
sourceObservable.multicast(new Rx.AsyncSubject()).refCount();
sourceObservable.share(); factory, , - , sourceObservable, . , factory .
, Rx.Subject(), , , ( , , ), :
const {
Observable
} = Rx;
let sourceObservable = Observable
.create((observer) => {
let count = 0;
let interval = setInterval(() => {
observer.next(count++)
}, 700);
setTimeout(() => {
clearInterval(interval);
observer.complete();
}, 5500);
return () => {
clearInterval(interval);
console.log('######## Source observable unsubscribed');
}
})
.do((x) => console.log('#### Source emits: ' + x));
let subjectFactory = () => new Rx.ReplaySubject(1);
let subject = sourceObservable
.multicast(subjectFactory)
.refCount();
;
let pageOneObserver;
let pageTwoObserver;
let pageThreeObserver;
setTimeout(() => {
console.log('pageOneObserver will subscribe');
pageOneObserver = subject.subscribe({
next: (x) => {
console.log('pageOneObserver gets: ' + x);
},
complete: () => {
console.log('pageOneObserver: complete');
}
});
}, 1000);
setTimeout(() => {
console.log('pageTwoObserver will subscribe');
pageTwoObserver = subject.subscribe({
next: (x) => {
console.log('pageTwoObserver gets: ' + x);
},
complete: () => {
console.log('pageTwoObserver: complete');
}
});
}, 4000);
setTimeout(() => {
console.log('pageOneObserver will unsubscribe');
pageOneObserver.unsubscribe();
}, 7000);
setTimeout(() => {
console.log('pageTwoObserver will unsubscribe');
pageTwoObserver.unsubscribe();
}, 10000);
setTimeout(() => {
console.log('pageThreeObserver will subscribe');
pageThreeObserver = subject.subscribe({
next: (x) => {
console.log('pageThreeObserver gets: ' + x);
},
complete: () => {
console.log('pageThreeObserver: complete');
}
});
}, 13000);
setTimeout(() => {
console.log('pageThreeObserver will unsubscribe');
pageThreeObserver.unsubscribe();
}, 16000);
<script src="https://unpkg.com/rxjs@5.1.1/bundles/Rx.min.js"></script>
Hide result-, - .