Redo JavaRx2 Stream Aborted

I create nested queries as the following (some error handling omitted):

return Single.create((SingleOnSubscribe<String>) emitter -> getPages() .subscribe(pages -> getPageData(emitter, pages), emitter::onError)) .compose(applySchedulers()); // ... private void getPageData(SingleEmitter<String> emitter, List<Page> pages) { service.getPage(pages.get(0).id) .subscribe(emitter::onSuccess, e -> { pages.remove(0); getPageData(emitter, pages); }); } 

I used to have an iterative solution that gave the same result. The list of pages is sorted in order and should be processed as such. This part of the code works if the connection is good, however, if I get a bad connection, I get java.io.InterruptedIOException: thread interrupted . What would be a good way to solve this problem?

EDIT:

stack:

 W/System.err: java.io.InterruptedIOException: thread interrupted W/System.err: at okio.Timeout.throwIfReached(Timeout.java:145) W/System.err: at okio.Okio$2.read(Okio.java:136) W/System.err: at okio.AsyncTimeout$2.read(AsyncTimeout.java:237) W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46) W/System.err: at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:429) W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46) W/System.err: at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56) W/System.err: at okio.InflaterSource.refill(InflaterSource.java:101) W/System.err: at okio.InflaterSource.read(InflaterSource.java:62) W/System.err: at okio.GzipSource.read(GzipSource.java:80) W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46) W/System.err: at okio.ForwardingSource.read(ForwardingSource.java:35) W/System.err: at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:291) W/System.err: at okio.Buffer.writeAll(Buffer.java:1005) W/System.err: at okio.RealBufferedSource.readString(RealBufferedSource.java:190) W/System.err: at okhttp3.ResponseBody.string(ResponseBody.java:175) 

EDIT 2:

GetPages function:

 private Single<List<Page>> getPage() { return Observable.merge(service.getPage("mn").toObservable(), service.getPage("fc",).toObservable(), service.getPage("sh").toObservable()) .map(PageParser::parseActive) .flatMap(Observable::fromIterable) .sorted((f1, f2) -> f2.wage - f1.wage) .toList(); } 
+5
source share
1 answer

Perhaps I found a solution for this:

 private void getPageData(SingleEmitter<String> emitter, List<Page> pages) { try { service.getPage(pages.get(0).id) .subscribe(emitter::onSuccess, e -> { pages.remove(0); getPageData(emitter, pages); }); } catch (InterruptedIOException e) { Log.d(TAG, e.getLocalizedMessage(), e); } } 

You must use the rxFragment component to interrupt the rxJava2 stream when a user fragment or activity is stopped by the user:

 observable.compose(RxLifecycle.<NetworkResult, FragmentEvent>bindUntilEvent(lifecycle(), FragmentEvent.STOP)); 

But a ThreadInterrupted exception will occur because you have to handle yourself Exception for Retrofit2 and just ignore it. It works very well for me.

0
source

Source: https://habr.com/ru/post/1268673/


All Articles