RxJS: Interrupt producer producer

In RxJS, I have a special problem with the manufacturer. The manufacturer slowly produces items. The consumer requests items and often has to wait for the manufacturer. This can be achieved by changing the manufacturer and the flow of requests:

var produce = getProduceStream(); var request = getRequestStream(); var consume = Rx.Observable.zipArray(produce, request).pluck(0); 

Sometimes the request is interrupted. A produced item should only be consumed after a request is refused:

 produce: -------------p1-------------------------p2---------> request: --r1--------------r2---------------r3--------------> abort: ------a(r1)------------------a(?)------------------> consume: ------------------c(p1, r2)-------------c(p2, r3)--> 

The first request r1 will consume the first created element p1 , but r1 is aborted by a(r1) before it can consume p1 . p1 produced and consumed by c(p1, r2) at the second request of r2 . The second interrupt a(?) ignored because there was no response to an unanswered request before. The third request r3 must wait for the next created element p2 and does not interrupt until p2 . Thus, p2 consumed by c(p2, r3) immediately after it is received.

How can I achieve this in RxJS?

Edit: I created an example with the QUnit test on jsbin. You can edit the createConsume(produce, request, abort) function to try / test the solution.

The example contains a definition of the function of a previously accepted answer.

+6
source share
3 answers

This solution ignores interrupts that do not follow without an answer:

 const {merge} = Rx.Observable; Rx.Observable.prototype.wrapValue = function(wrapper) { wrapper = (wrapper || {}); return this.map(function (value) { wrapper.value = value; return wrapper; }); }; function createConsume(produce, request, abort) { return merge( produce.wrapValue({type: 'produce'}), request.wrapValue({type: 'request'}), abort.wrapValue({type: 'abort'}) ) .scan( [false, []], ([isRequest, products], e) => { // if last time the request was answered if (isRequest && products.length) { // remove consumed product products.shift(); // mark request as answered isRequest = false; } if (e.type === 'produce') { // save product to consume later products.push(e.value); } else { // if evaluated to false, e.type === 'abort' isRequest = (e.type === 'request'); } return [isRequest, products]; } ) .filter( ([isRequest, products]) => (isRequest && products.length) ) .map( ([isRequest, products]) => products[0] ); // consume } 

Code in the latest test on JSBin.

0
source

This (basic idea minus details) passes your JSBin test:

 var consume = request .zip(abort.merge(produce), (r,x) => [r,x]) .filter(([r,x]) => isNotAbort(x)) .map(([r,p]) => p); 

And the JSBin code .

+3
source

I can’t completely cover my brain how to do this with existing operators. Here's how to do it with Observable.create() :

 return Rx.Observable.create(function (observer) { var rsub = new Rx.SingleAssignmentDisposable(); var asub = new Rx.SingleAssignmentDisposable(); var psub = new Rx.SingleAssignmentDisposable(); var sub = new Rx.CompositeDisposable(rsub, asub, psub); var rq = []; var pq = []; var completeCount = 0; var complete = function () { if (++completeCount === 2) { observer.onCompleted(); } }; var consume = function () { if (pq.length && rq.length) { var p = pq.shift(); var r = rq.shift(); observer.onNext('p' + p); } }; rsub.setDisposable(request.subscribe( function (r) { rq.push(r); consume(); }, function (e) { observer.onError(e); }, complete)); asub.setDisposable(abort.subscribe( function (a) { rq.shift(); }, function (e) { observer.onError(e); } )); psub.setDisposable(produce.subscribe( function (p) { pq.push(p); consume(); }, function (e) { observer.onError(e); }, complete)); return sub; }); 

http://jsbin.com/zurepesijo/1/

+2
source

Source: https://habr.com/ru/post/983057/


All Articles