It pretty much depends on what you want to do, but let me say what you want to do TruthyObservable, which behaves the same as the default Observable.create(...), but only passes even numbers:
import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';
class TruthyObservable<T> extends Observable<T> {
constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
if (subscribe) {
let oldSubscribe = subscribe;
subscribe = (obs: Subscriber<any>) => {
obs = this.appendOperators(obs);
return oldSubscribe.call(this, obs);
};
}
super(subscribe);
}
private appendOperators(obs: Subscriber<any>) {
let subject = new Subject();
subject
.filter((val: number) => val % 2 == 0)
.subscribe(obs);
return new Subscriber(subject);
}
}
let o = new TruthyObservable<number>((obs: Observer<number>) => {
obs.next(3);
obs.next(6);
obs.next(7);
obs.next(8);
});
o.subscribe(val => console.log(val));
Sent to the console:
6
8
See demo version: https://jsbin.com/recuto/3/edit?js,console
, Observable _subscribe(), , , ( Observable ). _subscribe() _subscribe, , - , . _subscribe , Subject, filter() appendOperators(). , Observer Subject obs = this.appendOperators(obs).
, , . obs.next(3); Subject, Observer.