MissingBackpressureException even after calling onBackpressureBlock ()

I try to generate events periodically (every 150 ms), even if the stream observed above will send events faster.

But I get a MissingBackpressureException , although I called onBackpressureBlock()

code:

  SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create()); return subject .subscribeOn(Schedulers.computation()) .doOnSubscribe(() -> { NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> { Log.d(TAG, "new quotation " + quotation.hashCode()); NetworkRequestsManager.instance().getSeller(quotation.sellerId) .subscribe(seller -> { for (Outlet outlet : seller.outlets) { if (outlet.latitude != null && outlet.longitude != null) subject.onNext(new QuotationMarker(outlet, quotation.price)); } }, error -> Log.fatalError(new RuntimeException(error))); }, error -> Log.fatalError(new RuntimeException(error))); }) .doOnError(throwable -> Log.fatalError(new RuntimeException( "error response in subscribe after doOnSubscribe", throwable))) // combine with another observable that emits items regularly (every 100ms) // so that a new event is received every 100ms : // also, first event itself is delayed. .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS), (seller, aLong) -> seller) .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s .doOnError(throwable -> Log.fatalError(new RuntimeException( "error response after onBackpressureBlock()", throwable))); // <-- error is thrown here 

track:

  05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.errīš• java.lang.RuntimeException: error response after onBackpressureBlock() 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.Observable$11.onError(Observable.java:4193) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.Scheduler$Worker$1.call(Scheduler.java:120) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390) 05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.FutureTask.run(FutureTask.java:234) 05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) 05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš• at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš• at java.lang.Thread.run(Thread.java:841) 05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.errīš• Caused by: rx.exceptions.MissingBackpressureException 05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349) 05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.errīš• at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330) 05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.errīš• ... 10 more 

PS: Log.fatalError(err) is just my wrapper around Android.util.Log.e(...)

EDIT

After a lot of trial and error, for me it becomes a wont fix . zipWith(Observable.interval...) seems to be the culprit and a possible wireframe error. By removing these lines (and therefore my periodic highlight function), my code works. I use a topic that probably calls onNext from different threads, and then I execute the Obeservable statements on it.

+6
source share
3 answers

I think (but I'm not sure) that the problem is that the configuration of your backpressure is done after the zip operator.

The zip statement needs to buffer elements of one Observable in order to pin it to another Observable . This is the buffer that should throw your exception. (see here )

To solve your problem, I think you should try adding a backpressure configuration to one (or each) Observable used in the zip statement.

example:

 obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop()); obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS)); 
+2
source

The answer above from @dwursteisen and @zsxwing is correct.

An interval operator is one that emits in time and is thus “hot” and does not support backpressure. This way, it will continue to emit and populate the internal restricted zip buffer, which raises a MissingBackpressureException.

When working with a "hot" source (for example, based on time or user events), you should choose a strategy for dealing with overflow.

In this case, you will need to put this strategy in the interval statement.

Here is the code showing what is happening and the options for solving it:

 import java.util.concurrent.TimeUnit; import rx.Observable; public class ZipInterval { public static void main(String... args) { Observable<Long> slowHotSource = Observable.interval(1, TimeUnit.SECONDS); /** This one is fast and hot so will cause a MissingBackpressureException. * * This is because a "hot" source based on time does not obey backpressure * and keeps emitting regardless of what the downstream asks for. * * Examples of "hot" and "cold" and approaches to both can be found at: * https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90 and * https://github.com/ReactiveX/RxJava/wiki/Backpressure * */ // Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS); /** * The following version of 'fastHotSource' composes a simple flow control strategy. */ Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop(); Observable<String> zipped = Observable.zip(slowHotSource, fastHotSource, (s, f) -> { return s + " " + f; }); // subscribe to the output System.out.println("---- zip output"); zipped.take(10).toBlocking().forEach(System.out::println); /** * The outcome of the above is probably not what is expected though. * * This is because zip will buffer the output and then `fastHotSource` will drop until * the zip buffer asks for more. * * For temporal or "hot" sources like this, using `withLatestFrom` or `combineLatest` * is often more appropriate than `zip`. */ Observable<String> latest = slowHotSource.withLatestFrom(fastHotSource, (s, f) -> { return s + " " + f; }); // subscribe to the output System.out.println("---- latest output"); latest.take(10).toBlocking().forEach(System.out::println); } } 

The result of this:

 ---- zip output 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 ---- latest output 0 1002 1 2002 2 3000 3 4001 4 5003 5 6001 6 7000 7 8002 8 9005 9 10000 
+2
source

Try using combineLatest , because combining lastest does not wait for new values ​​to call onNext, it uses the last values ​​when a new value comes to the function

0
source

Source: https://habr.com/ru/post/986595/


All Articles