RXJS Observed Stretch

I have an Rx.Observable.webSocket object. The server endpoint cannot process messages that receive the same time (<25ms). Now I need a way to stretch the following () calls to my websocket object.

I created another Subject requestSubjectand subscribed to it. Then call the next website inside the subscription.

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

With the delay shift, each subsequent call gets the same delay time, then all subsequent calls issue the same time later ... this is not what I want.

I tried delay, throttle, debouncebut it is not suitable.

The following should illustrate my problem.

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-
+4
source share
2

, , :

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
Hide result

, . timestamp() mergeMap concurrency 1, , . Rx- .

+1

rxjs- - , , (. 1- ):

(MUCH , , ):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

const spreadDelay = 1000;
let prevEmitTime = 0;

click$
  .concatMap(i => {  // in this case you could also use "flatMap" or "mergeMap" instead of "concatMap"
    const now = Date.now();
    if (now - prevEmitTime > spreadDelay) {
      prevEmitTime = now;
      return Rx.Observable.of(i); // emit immediately
    } else {
      prevEmitTime += spreadDelay;
      return Rx.Observable.of(i).delay(prevEmitTime - now); // emit somewhere in the future
    }
  })
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>
Hide result

RxJS ( , , ):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

click$
  // window will create a new substream whenever no click happened for 1001ms (with the spread out 
  .window(click$
    .concatMap(i => Rx.Observable.of(i).delay(1000))
    .debounceTime(1001)
  )
  .mergeMap(win$ => Rx.Observable.merge(
    win$.take(1).merge(), // emitting the "first" click immediately
    win$.skip(1)
      .merge()
      .concatMap(i => Rx.Observable.of(i).delay(1000)) // each emission after the "first" one will be spread out to 1 seconds
  ))
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>
Hide result
+1

Source: https://habr.com/ru/post/1681569/


All Articles