I try to generate events periodically (every 150 ms), even if the stream observed above will send events faster.
But I get a MissingBackpressureException , although I called onBackpressureBlock()
code:
SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create()); return subject .subscribeOn(Schedulers.computation()) .doOnSubscribe(() -> { NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> { Log.d(TAG, "new quotation " + quotation.hashCode()); NetworkRequestsManager.instance().getSeller(quotation.sellerId) .subscribe(seller -> { for (Outlet outlet : seller.outlets) { if (outlet.latitude != null && outlet.longitude != null) subject.onNext(new QuotationMarker(outlet, quotation.price)); } }, error -> Log.fatalError(new RuntimeException(error))); }, error -> Log.fatalError(new RuntimeException(error))); }) .doOnError(throwable -> Log.fatalError(new RuntimeException( "error response in subscribe after doOnSubscribe", throwable))) // combine with another observable that emits items regularly (every 100ms) // so that a new event is received every 100ms : // also, first event itself is delayed. .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS), (seller, aLong) -> seller) .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s .doOnError(throwable -> Log.fatalError(new RuntimeException( "error response after onBackpressureBlock()", throwable))); // <-- error is thrown here
track:
05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.errīš java.lang.RuntimeException: error response after onBackpressureBlock() 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.Observable$11.onError(Observable.java:4193) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.Scheduler$Worker$1.call(Scheduler.java:120) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.FutureTask.run(FutureTask.java:234) 05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) 05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš at java.lang.Thread.run(Thread.java:841) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš Caused by: rx.exceptions.MissingBackpressureException 05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.errīš at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349) 05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.errīš at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330) 05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.errīš ... 10 more
PS: Log.fatalError(err) is just my wrapper around Android.util.Log.e(...)
EDIT
After a lot of trial and error, for me it becomes a wont fix . zipWith(Observable.interval...) seems to be the culprit and a possible wireframe error. By removing these lines (and therefore my periodic highlight function), my code works. I use a topic that probably calls onNext from different threads, and then I execute the Obeservable statements on it.
source share