How to build an rx-poller that waits for some interval AFTER the previous ajax sentence?

Worked on several approaches to this. Basically, I don’t want poller to run ajax every 30 seconds from the start of the poll - I want poller to run requests 30 seconds AFTER returning the previous request. In addition, I want to work in some strategy around the exponential deviation for failures.

Here is what I still have (Rx4):

rx.Observable.create(function(observer) {
    var nextPoll = function(obs) {
      // the action function invoked below is what i'm passing in
      // to my poller service as any function which returns a promise
      // and must be invoked each loop due to fromPromise caching
      rx.Observable.fromPromise(action())
        .map(function (x){ return x.data; })
        .subscribe(function(d) {       
            // pass promise up to parent observable  
            observer.onNext(d);

            // reset interval in case previous call was an error
            interval = initInterval;   
            setTimeout(function(){ nextPoll(obs); }, interval);
        }, function(e) {
          // push interval higher (exponential backoff)
          interval = interval < maxInterval ? interval * 2 : maxInterval;
          setTimeout(function(){ nextPoll(obs); }, interval);

        });
    };
    nextPoll(observer);
});

For the most part this does what I want. I don’t like using setTimeout, but I can’t find a better Observable approach to this (except for a one-time interval / timer with a different subscription).

, , - , , , . , . /, ajax ajax , .

, , setTimeout. , - , ! !

: - , . :

function computeInterval(error) {
  if (error) {
    // double until maximum interval on errors
    interval = interval < maxInterval ? interval * 2 : maxInterval;
  } else {
    // anytime the poller succeeds, make sure we've reset to
    // default interval.. this also allows the initInterval to 
    // change while the poller is running
    interval = initInterval;
  }
  return interval;
}

poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
  .retryWhen(function(errors){
    return errors.scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval(true));
      });
  })
  .repeatWhen(function(notification){
    return notification
      .scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval());
      });
  });
+2
2

, , , , :

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action()).materialize();
    }

    function computeDelay(){
        // put your exponential delaying logic here
    }

    executeAction()
        .expand(function (x) {
            return Rx.Observable.return({})
                .delay(computeDelay())
                .flatMap(function(){return executeAction(action);})
        })
        .subscribe(function(notification){
             if (notification.kind === "N") {
               observer.onNext(notification.value.data);
             } else if (notification.kind === "E") {
               console.log("error:", notification.error.message);
             }
        });
});

, expand delay . . Erros materialize ( , ).

+2

, , /, , . repeatWhen. , , , .

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action());
    }

    function computeDelay(){
        // put your exponential delaying logic here
    }

executeAction()
  .repeatWhen(function(notification){
    return Rx.Observable.return({}).delay(computeDelay());
  })
  .subscribe(function(x){
    observer.onNext(x.data);
  })
});

, repeatWhen retryWhen , ( RxJava, Rxjs ). , ( ).

+2

Source: https://habr.com/ru/post/1658579/


All Articles