In RxJS, I have a special problem with the manufacturer. The manufacturer slowly produces items. The consumer requests items and often has to wait for the manufacturer. This can be achieved by changing the manufacturer and the flow of requests:
var produce = getProduceStream(); var request = getRequestStream(); var consume = Rx.Observable.zipArray(produce, request).pluck(0);
Sometimes the request is interrupted. A produced item should only be consumed after a request is refused:
produce:
The first request r1 will consume the first created element p1 , but r1 is aborted by a(r1) before it can consume p1 . p1 produced and consumed by c(p1, r2) at the second request of r2 . The second interrupt a(?) ignored because there was no response to an unanswered request before. The third request r3 must wait for the next created element p2 and does not interrupt until p2 . Thus, p2 consumed by c(p2, r3) immediately after it is received.
How can I achieve this in RxJS?
Edit: I created an example with the QUnit test on jsbin. You can edit the createConsume(produce, request, abort) function to try / test the solution.
The example contains a definition of the function of a previously accepted answer.
source share