Get return value from Observable subscription

Using RxJS 5.0.0-rc.1, I try to convey my Observerand Observablea manner similar to how the generators / iterators through the exchange of data with yieldand .next(). The goal is to get the call to .subscribereturn and change / update the following values ​​in my observable thread depending on this.

I'm not quite sure if this is possible at all. Although, I found out that you can catch the exceptions thrown on callbacks .subscribe. The following snippets print "Boom!":

var source = Observable.create((observer) => {
  try {
    observer.next(42);
  } catch (e) {
    // This will catch the Error
    // thrown on the subscriber
    console.log(e.message);
  }
  observer.complete();
});

source.subscribe(() => {
  throw new Error('Boom!');
});

, , , , ? Observable ? , . , "" ?

.


, , - . - :

var source = Observable.create((observer) => {
  // This will print "{ success: true }"
  observer.next({ value: 42, reply: console.log });
  observer.complete();
});

source.subscribe(({ value, reply }) => {
  console.log('Got', value);
  return reply({ success: true });
});

?


2

, , . API ( AMQP-RPC ), , RxJS .

, : a Publisher , Consumer. Consumer Publisher, , .

API :

Consumer().consume('some.pattern')
  .subscribe(function(msg) {
    // Do something with `msg`
    console.log(msg.foo);
    return { ok: true };
  });

Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer

42.

Publisher Consumer. .subscribe(). : ?

Consumer#consume() :

/**
 * Returns an async handler that gets invoked every time
 * a new message matching the pattern of this consumer
 * arrives.
 */
function waitOnMessage(observer) {
  return function(msg) {
    observer.next(msg);
    // Conceptually, I'd like the returned
    // object from `.subscribe()` to be available
    // in this scope, somehow.
    // That would allow me to go like: 
    // `sendToQueue(pubQueue, response);`
  }
}

return Observable.create((observer) => {
  queue.consume(waitOnMessage(observer));
});

?

+4
1

. , ( ) ( ).

- , Iterable. , Rxjs Observable - a.k.a( / subscribe), ( , Observer). subscribe Disposable, (). , , API- .

API- rxjs. , , ( , , ).

var backChannel = Rx.Subject();
backChannel.startWith(initialValue).concatMap(generateValue)
  .subscribe(function observer(value){
  // Do whatever
  // pass a value through the backChannel
  backChannel.next(someValue)
})
// generateValue is a function which takes a value from the back channel 
// and returns a promise with the next value to be consumed by the observer.

:

function twoWayObsFactory (yield, initialValue) {
  var backChannel = Rx.BehaviorSubject(initialValue);
  var next = backChannel.next.bind(backChannel);
  return {
    subscribe : function (observer) {
      var disposable = backChannel.concatMap(yield)
        .subscribe(function(x) {
           observer(next, x);
        });
      return {
        dispose : function (){disposable.dispose(); backChannel.dispose();}
      }
    }
  }
}

// Note that the observer is now taking an additional parameter in its signature
// for instance
// observer = function (next, yieldedValue) {
//              doSomething(yieldedValue);
//              next(anotherValue);
//            }
// Note also that `next` is synchronous, as such you should avoir sequences
// of back-and-forth communication that is too long. If your `yield` function
// would be synchronous, you might run into Qaru errors.
// All the same, the `next` function call should be the last line, so order of
// execution in your program is the same independently of the synchronicity of
// the `yield` function

, , , . , javascript, , Babel (. https://github.com/tc39/proposal-async-iteration).

:

( , , , , ), expand . , , doc SO :

expand .

+2

Source: https://habr.com/ru/post/1658572/


All Articles