Observed doOnError correct location

I'm kind of new to Observers , and I'm still trying to figure them out. I have the following code:

 observableKafka.getRealTimeEvents() .filter(this::isTrackedAccount) .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload())) .map(ledgerMapper::mapLedgerTransaction) .map(offerCache::addTransaction) .filter(offer -> offer != null) // Offer may have been removed from cache since last check .filter(Offer::isReady) .doOnError(throwable -> { LOG.info("Exception thrown on realtime events"); }) .forEach(awardChecker::awardFailOrIgnore); 

getRealTimeEvents() returns an Observable<Event> .

Does the value of .doOnError ? Also, what is the effect of adding multiple codes to this piece of code? I realized that I can do this, and they are all called, but I'm not sure what might be his goal.

+6
source share
2 answers

Yes Yes. doOnError acts when the error passes through the stream at that particular point, so if the statement is before doOnError throw (s), your action will be called. However, if you put doOnError further, it may or may not be called, depending on which statements are in the chain.

Considering

 Observer<Object> ignore = new Observer<Object>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object t) { } }; 

For example, the following code always raises doOnError:

 Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore); 

However, this code will not:

 Observable.just(1).doOnError(e -> log(e)) .flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore); 

Most statements will return exceptions that come from the downstream.

Adding multipe doOnError viable if you throw an exception through onErrorResumeNext or onExceptionResumeNext :

 Observable.<Object>error(new RuntimeException()) .doOnError(e -> log(e)) .onErrorResumeNext(Observable.<Object>error(new IllegalStateException())) .doOnError(e -> log(e)).subscribe(ignore); 

otherwise, you will register the same exception in several places in the chain.

+14
source

doOn??? methods doOn??? for side effects, the processing of which is not really your core business value. For this, registration is excellent. However, sometimes you want to do something more meaningful with an error, for example, try again or show a message to the user, etc. In these cases, the path "rx" will handle the error in the subscribe call.

doOnError (and other doOn methods) wraps the original Observable in a new one and adds behavior to it (obviously around its onError method). That is why you can call it many times. In addition, one of the advantages that you can name it anywhere in the chain is that you can access errors that would otherwise be hidden from the stream consumer ( Subscriber ), for example, because there are retrying ...

+2
source

Source: https://habr.com/ru/post/986064/


All Articles