RxJava2 Observed Back Pressure

I recently realized that I don’t understand how backpressure works RxJava2.

I did a little test, and I expect it to complete with an exception MissingBackpressureException:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

Next, the system is displayed:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

Why is he not producing MissingBackpressureException.

I expect to e.onNext(i);put the item in the buffer ObservableObserveOnand after that the size will be largerstatic final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

He must give up MissingBackpressureException, which will not happen. Does the buffer automatically create growth? If not, where are the items stored?

+4
source share
1 answer

, Flowable RxJava2, . .
Flowable BackpressureStrategy.MISSING, .
, , , observerOn docs:

ObservableSource ...

+3

Source: https://habr.com/ru/post/1679796/


All Articles