Using RxJS 5.0.0-rc.1, I try to convey my Observerand Observablea manner similar to how the generators / iterators through the exchange of data with yieldand .next(). The goal is to get the call to .subscribereturn and change / update the following values in my observable thread depending on this.
I'm not quite sure if this is possible at all. Although, I found out that you can catch the exceptions thrown on callbacks .subscribe. The following snippets print "Boom!":
var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
console.log(e.message);
}
observer.complete();
});
source.subscribe(() => {
throw new Error('Boom!');
});
, , , , ? Observable ? , . , "" ?
.
, , - . - :
var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});
source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});
?
2
, , . API ( AMQP-RPC ), , RxJS .
, : a Publisher , Consumer. Consumer Publisher, , .
API :
Consumer().consume('some.pattern')
.subscribe(function(msg) {
console.log(msg.foo);
return { ok: true };
});
Publisher().publish('some.pattern', { foo: 42 })
42.
Publisher Consumer. .subscribe(). : ?
Consumer#consume() :
function waitOnMessage(observer) {
return function(msg) {
observer.next(msg);
}
}
return Observable.create((observer) => {
queue.consume(waitOnMessage(observer));
});
?