Transfer of non-standard Scheduler to the operator

Let's say that I want to pass the Scheduler to the RxJS operator, which causes it to emit notifications every 5 seconds. Of course, this is very easy to do, simply using intervalor other existing operators. But if I really want to use a scheduler for this, how would I do it?

My first thought is to subclass Rx.Scheduler.default. Will it be the way? And if so, what could this subclass look like? Again, I understand that this is a tricky way to accomplish something that is easy with operators, but I'm just wondering how to set up schedulers.

+4
source share
2 answers

, . , . , . , .

, , . , .

schedulePeriodic, scheduleRecursiveFuture.

//Using periodic
Rx.Observable.interval = function(period, scheduler) {

  return Rx.Observable.create(function(observer) {
    return scheduler.schedulePeriodic(0, period, function(count) {
      observer.onNext(count);
      return count + 1;
    });
  });

};

//Using scheduleRecursive
Rx.Observable.interval = function(period, scheduler) {

  return Rx.Observable.create(function(observer) {
    return scheduler.scheduleRecursiveFuture(0, period, function(count, self) {
      observer.onNext(count);
      self(period, count + 1); 
    });
  });
};

1, 2;

, , , .

, , , , - ( self). , period.

, . , default, , setTimeout, setInterval , . TestScheduler HistoricalScheduler, , , , .

tl; dr Scheduler, , API , Scheduler , .

+4

?

: . , , , . - buffer, window, sample .. .


RxJS 4

, RxJS 4 Rx.Scheduler, : schedule, scheduleFuture, schedulePeriodic, scheduleRecursive, scheduleRecursiveFuture... , now, -, .

,

/**
NOTE: This is REALLY fast example. There is a lot that goes into implementing a 
Scheduler in RxJS, for example what would `now()` do in the scheduler below? It also missing a number of scheduling methods.
*/

class ButtonScheduler extends Rx.Scheduler {
  /** 
    @param {string} the selector for the button (ex "#myButton")
  */
  constructor(selector) {
    super();
    this.button = document.querySelector(selector);
  }

  schedule(state, action) {
    const handler = (e) => {
      action(state);
    };
    const button = this.button;

    // next click the action will fire
    button.addEventListener('click', handler);

    return {
      dispose() {
        // ... unless you dispose of it
        button.removeEventListener('click', handler);
      }
    };
  }

  // Observable.interval uses schedulePeriodic
  schedulePeriodic(state, interval, action) {
    const button = this.button;

    let i = 0;
    const handler = (e) => {
      const count = i++;
      if(count > 0 && count % interval === 0) {
        state = action(state);
      }
    };

    // next click the action will fire
    button.addEventListener('click', handler);

    return {
      dispose() {
        // ... unless you dispose of it
        button.removeEventListener('click', handler);
      }
    };
  }
}

Rx.Observable.interval(1, new ButtonScheduler('#go'))
  .subscribe(x => {
    const output = document.querySelector('#output');
    output.innerText += x + '\n';
  });

RxJS 5 (alpha)

RxJS 5, .

RxJS5 , :

interface Scheduler {
   now(): number
   schedule(action: function, delay: number = 0, state?: any): Subscription
}

Subscription - unsubscribe ( , dispose, )

, , , .

, .

+1

Source: https://habr.com/ru/post/1609690/


All Articles