The answer from @michalsamek seems correct, although backpressure only works for Flowables. I adjusted his subscriber so that he would fulfill the requested requests.
There was also a slight problem when using it in packages at different times.
private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); } Observable.range(1, 100) .observeOn(Schedulers.io()) .toFlowable(BackpressureStrategy.BUFFER) .compose(Flowable::onBackpressureBuffer) .lift(allowPerMillis(200)) .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { private final Subscriber<T> upstream; private final int millis;
source share