How to set timeout on restart with RxJS?

I work with the latest Angular and Typescript and RxJS 5.

Angular is currently making RxJS necessary. I have been using C # mostly for over 10 years, and I'm very used to the Linq / Lambdas / free syntax, which I believe has become the foundation of Reactive.

I would like to make an Http call with an increase in the timeout value on restarting, but I had a problem with how to do this, and still keeping everything in the pipeline (without using an external state).

I get that I can do this, but it will just repeat using the same timeout value.

myHttpObservable.timeout(1000).retry(2); 

The documentation for RxJS was poor in many places, and asking about it here only removed my question from existence, which is sad ... so I was forced to look at the source.

Is there a way to try again with an increase in the timeout duration each time so as to maintain state in the pipeline? In addition, I want the first timeout to be made on the first try.

At first I tried similar things, but realized the confusing retryWhen statement is not intended for what I want:

 myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => { return aNewMyObservableCreatedinHere.timeout(2000); }); 

I know that I can do this using an external state, but I'm mostly looking for an elegant solution, which, I think, is what they are fighting for with a reactive programming style.

+6
source share
4 answers

One of the biggest problems with RxJs5 at the moment is the documentation. It is really fragmented and does not match the previous version. After looking at the RxJs4 documentation, you can see that .retryWhen() already has an example to create an exponential delay, which can be easily ported to RxJs5:

 Rx.Observable.throw(new Error('splut')) .retryWhen(attempts => Rx.Observable.range(1, 3) .zip(attempts, i => i) .mergeMap(i => { console.log("delay retry by " + i + " second(s)"); return Rx.Observable.timer(i * 1000); }) ).subscribe(); 
+6
source

Based on the optional input of comments in my previous answer, I experimented with the .expand() operator to solve your requirements:

I want to do with a timeout of duration = X, and if it expires, then try again with a timeout of = X + 1

The following snippet starts with a timeout = 500, and for each attempt, the timeout increases with a * 500 attempt until maxAttempts is reached or the result is successfully received:

 getWithExpandingTimeout('http://reaches-max-attempts', 3) .subscribe( res => console.log(res), err => console.log('ERROR: ' + err.message) ); getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10) .subscribe( res => console.log(res), err => console.log('ERROR: ' + err.message) ); /* retrieve the given url and keep trying with an expanding timeout until it succeeds or maxAttempts has been reached */ function getWithExpandingTimeout(url, maxAttempts) { return get(url, 1) .expand(res => { if(res.error) { const nextAttempt = 1 + res.attempt; if(nextAttempt > maxAttempts) { // too many retry attempts done return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`)); } // retry next attempt return get(url, nextAttempt); } return Rx.Observable.empty(); // done with retrying, result has been emitted }) .filter(res => !res.error); } /* retrieve info from server with timeout based on attempt NOTE: does not errors the stream so expand() keeps working */ function get(url, attempt) { return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`) .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`)) .delay(5 * 500) .timeout(attempt * 500) .catch(error => Rx.Observable.of({ attempt: attempt, error })); } 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script> 

How it works

Each value that is created upstream or by the .expand() operator is emitted downstream and used as input to the .expand() operator. This continues until no other values ​​are thrown. Using this behavior, the .get() function .get() with an increasing attempt when the radiation contains an .error value inside.

The .get() function does not .get() error, because otherwise we need to catch it in .expand() , or the recursion will .expand() unexpectedly.

When maxAttempts is exceeded, the .expand() operator throws an error stopping the recursion. When there is no .error property in the emission, we expect this to be a successful result and emit an empty Observable, stopping the recursion.

NOTE. It uses .filter() to remove all outliers based on the .error property, because all the values ​​generated by .expand() are also emitted downstream, but these .error values ​​are internal state.

+3
source

The backoff-rxjs npm package has an operator for solving this case called retryBackoff . I described this in an article on blog.angularindepth.com , but in a nutshell:

 source.pipe( retryWhen(errors => errors.pipe( concatMap((error, iteration) => timer(Math.pow(2, iteration) * initialInterval, maxInterval)))); 

Here are the sources for a more custom version.

0
source

I use delayWhen to create a notification function that causes retryWhen emit with increasing delays after each error. You can select a different time series by changing the formula used to calculate the delay between retries. See the two examples of formulas below for geometric and exponential retry retry strategies. Also pay attention to how I limit the number of retries and throw an error if I reach this limit.

 const initialDelay = 1000; const maxRetries = 4; throwError('oops') .pipe( retryWhen(errors => errors.pipe( delayWhen((_,i) => { // const delay = (i+1)*initialDelay; // geometric const delay = Math.pow(2,i)*initialDelay; // exponential console.log('retrying after ${delay} msec...'); return timer(delay); }), take(maxRetries), concat(throwError('number of retries exceeded'))))) .subscribe( x => console.log('value:', x), e => console.error('error:', e) ); 
0
source

Source: https://habr.com/ru/post/1014456/


All Articles