Synchronous response flow from request flow with RxJS

I am new to RxJS and wondered if anyone could help me.

I want to create a synchronous response stream (preferably with corresponding requests) from the request stream (payload data).

Basically, I want requests to be sent one after another, each of which is waiting for a response from the latter.

I tried this, but it sends everything at once ( jsbin ):

var requestStream, responseStream; requestStream = Rx.Observable.from(['a','b','c','d','e']); responseStream = requestStream.flatMap( sendRequest, (val, response)=>{ return {val, response}; } ); responseStream.subscribe( item=>{ console.log(item); }, err => { console.err(err); }, ()=>{ console.log('Done'); } ); function sendRequest(val) { return new Promise((resolve,reject)=>{ setTimeout(()=>{resolve('result for '+val);},1000); }); }; 

Below, to some extent, it works, but does not use a stream for request data ( jsbin ).

 var data, responseStream; data = ['a','b','c','d','e']; responseStream = Rx.Observable.create(observer=>{ var sendNext = function(){ var val = data.shift(); if (!val) { observer.onCompleted(); return; } sendRequest(val).then(response=>{ observer.onNext({val, response}); sendNext(); }); }; sendNext(); }); responseStream.subscribe( item=>{ console.log(item); }, err => { console.err(err); }, ()=>{ console.log('Done'); } ); function sendRequest(val) { return new Promise((resolve,reject)=>{ setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); }); }; 

Thanks!

EDIT:

To clarify, I wanted to achieve this:

"Send A when you get an answer for A, send B when you get an answer for B, send C, etc ...."

Using concatMap and defer, as suggested by user 3743222 , looks like this ( jsbin ):

 responseStream = requestStream.concatMap( (val)=>{ return Rx.Observable.defer(()=>{ return sendRequest(val); }); }, (val, response)=>{ return {val, response}; } ); 
+5
source share
1 answer

Try replacing flatMap with concatMap in the first code example and let me know if the resulting behavior matches what you are looking for.

 responseStream = requestStream.concatMap(//I replaced `flatMap` sendRequest, (val, response)=>{ return {val, response}; } ); 

Basically concatMap has a similar signature than flatMap , the difference in behavior is that it will wait until the current observable is flattened to completion before moving on to the next one. So here:

  • Value
  • a requestStream will be ported to the concatMap .
  • the concatMap operator will generate an observable sendRequest , and any values ​​from this observable (seem to be a tuple (val, response) ) will be passed through a selector function, and the result of the object will be passed downstream
  • when sendRequest completes, another requestStream value is requestStream .
  • In short, your requests will be processed one by one.

Alternatively, you might want to use defer to defer sendRequest .

 responseStream = requestStream.concatMap(//I replaced `flatMap` function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, (val, response)=>{ return {val, response}; } ); 
+3
source

Source: https://habr.com/ru/post/1242238/


All Articles