How to create an RxJS buffer that groups elements in NodeJS, but which does not rely on perpetual spacing?

I am collecting events from an application using Rx.Observable.fromEventin NodeJS. They are sent to another server using a request ( https://www.npmjs.com/package/request ). To avoid high network load, I need to buffer these events for a specific timeout between requests sent.

Problem

Usage bufferWithTime(200)will support the node process, and I cannot know when the application has finished closing the thread.

Is it possible to use Rx buffers to say:

  • When item 1 is pressed, set the timer
  • When Items 2 and 3 arrive before the timer expires, push them into the array [1, 2, 3] (buffer)
  • When the timer expires, send the array [1, 2, 3] down the channel.
  • If item 4 appears after the timer expires, set a new timer and restart it.

If no items are pressed, the timer does not start, which will cause the process to exit.

My initial approach:

Rx.Observable
     .fromEvent(eventEmitter, 'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))
+4
source share
2 answers

The proposed implementation using the operator delay:

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = source.delay(1200);

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})

buffered$.subscribe(emits("buffer"));

jsbin here: http://jsbin.com/wilurivehu/edit?html,js,console,output

+3
source

You probably need to split the stream and use the second part to start the first.

var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));

source
     .buffer(closer)
     .map(addEventsToRequestOption)
     .flatMap(function(x) { Promise.resolve(request(x)); })
     //I assume this log method returns a function?
     .subscribe(log('Response received'));

source.flatMapFirst(Rx.Observable.timer(2000)) . Observable, , 2000 . , . flatMapFirst , . , .

docs buffer Observable

+1

Source: https://habr.com/ru/post/1613568/


All Articles