Rxjs periodic endpoint polling with variable response time

I want to poll the endpoint no faster than once per second, and no slower than the time it takes to poll the endpoint. There should not be more than one request.

I need a reactive programming method for polling an endpoint at least once per second, but if the endpoint takes more than 1 second, the next query is launched immediately.

In the marble diagram below, the 2nd and 3rd queries take more than 1 second, but the fourth and fifth queries finish faster. The next request is triggered either on the 1st border, or immediately after receiving data from the last outstanding request.

s---s---s---s---s---s---| # 1 second interval observable r---r----r--------rr---| # endpoint begin polling events -d-------d--------dd-d--| # endpoint data response events 

I am trying to get the correct terminology on a marble diagram, so I am assuming that the beginning of the endpoint queries should be marble. I am the sticker “r”, and the marble event for which I mark “d” has data about the endpoints.

Here is how much code I needed to do in plain js; however, subsequent requests do not work immediately after receipt, as I said above.

  var poll; var previousData; var isPolling = false; var dashboardUrl = 'gui/metrics/dashboard'; var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts(); return { startInterval: startInterval, stopInterval: stopInterval }; function startInterval() { stopInterval(); tryPolling(); // immediately hit the dashboard // attempt polling at the interval poll = $interval(tryPolling, intervalMs); } /** * attempt polling as long as there is no in-flight request * once the in-flight request completes or fails, allow the next request to be processed */ function tryPolling() { if (!isPolling) { isPolling = true; getDashboard() // if the dashboard either returns successful or fails, reset the polling boolean .then(resetPolling, resetPolling); } } /** there no longer an in-flight request, so reset the polling boolean */ function resetPolling() { isPolling = false; } function stopInterval() { if (poll) { $interval.cancel(poll); poll = undefined; } } function getDashboard() { return restfulService.get(dashboardUrl) .then(updateDashboard); } function updateDashboard(data) { if (!utils.deepEqual(data, previousData)) { previousData = angular.copy(data); $rootScope.$broadcast('$dashboardLoaded', data); } } 
+5
source share
3 answers

Here is my solution. It uses the internal combineLatest and filter object to ensure that requests do not accumulate if responses are slower than the timer period.

Comments should explain how this works.

 const delays = [100, 2000, 100, 3000]; const since = Date.now(); let index = 0; function mock() { return Rx.Observable .of("res") .do(() => console.log("mock req at ", Date.now() - since, " ms")) .delay(delays[index++ % delays.length]) .do(() => console.log("mock res at ", Date.now() - since, " ms")); } function poll() { return Rx.Observable.defer(() => { // Use defer so that the internal subject is created for each // subscription. const subject = new Rx.BehaviorSubject({ tick: -1, pending: false }); return Rx.Observable // Combine the timer and the subject state. .combineLatest( Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)), subject ) // Filter out combinations in which either a more recent tick // has not occurred or a request is pending. .filter(([tick, state]) => (tick !== state.tick) && !state.pending) // Update the subject state. .do(([tick]) => subject.next({ tick, pending: true })) // Make the request and use the result selector to combine // the tick and the response. .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp]) // Update the subject state. .do(([tick]) => subject.next({ tick, pending: false })) // Map the response. .map(([tick, resp]) => resp); }); } poll().take(delays.length).subscribe(r => console.log(r)); 
 .as-console-wrapper { max-height: 100% !important; top: 0; } 
 <script src="https://unpkg.com/ rxjs@5 /bundles/Rx.min.js"></script> 

It just occurred to me that there is an operator that does just that: exhaustMap .

 const delays = [100, 2000, 100, 3000]; const since = Date.now(); let index = 0; function mock() { return Rx.Observable .of("res") .do(() => console.log("mock req at ", Date.now() - since, " ms")) .delay(delays[index++ % delays.length]) .do(() => console.log("mock res at ", Date.now() - since, " ms")); } const poll = Rx.Observable .timer(0, 1000) .do(tick => console.log("tick", tick)) .exhaustMap(() => mock()); poll.take(delays.length).subscribe(r => console.log(r)); 
 .as-console-wrapper { max-height: 100% !important; top: 0; } 
 <script src="https://unpkg.com/ rxjs@5 /bundles/Rx.min.js"></script> 
+3
source

I believe this does what you want:

 let counter = 0; function apiCall() { const delay = Math.random() * 1000; const count = ++counter; return Rx.Observable.timer(delay).mapTo(count); } Rx.Observable.timer(0, 1000) .mergeMap(() => apiCall()) .take(1) .repeat() .subscribe(x => { console.log(x); }); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script> 
  • timer(0, 1000) : emits immediately and one interval after that
  • mergeMap(...) : switches to the observable returned by the api call. This will lead to the appearance of a new observable with each attempt. If you do not want to create a new one for each attempt, replace it with mergeMapTo(apiCall()) .
  • take(1) : forcibly terminates the subscription so that the timer does not work after the api has released
  • repeat() : run sequence when api emits

Thus, the call will be immediately sent to api. If he does not return within one second, another call will be made every second. After answering from one of the api calls, the timer will be canceled and the whole sequence will begin. This does not cancel flight requests that I believe are in line with your intentions.

EDIT: If a later request is returned before the previous request, then the previous request will be thrown.

+3
source

I needed to think about this 15 minutes before I came up with an answer based only on rxjs and without side effects (no transfer variable) and no backpressure!

 const { Observable } = Rx; const mockHttpRequest = url => Observable .of('ok') .do(x => console.log('fetching...')) .delay(250); const poll = (httpRequest$, ms) => { const tick$ = Observable.timer(ms); return Observable .zip(httpRequest$, tick$) .repeat() .map(([httpResult]) => httpResult); }; poll(mockHttpRequest('your-url-here'), 1000) .do(console.log) .subscribe(); 

Here's a working Plunkr: https://plnkr.co/edit/sZTjLedNCE64bgLNhnaS?p=preview

+1
source

Source: https://habr.com/ru/post/1274640/


All Articles