Limiting the speed of observation

I have an observable that produces data from a fast stream from a database cursor. I am looking to throttle the output at a speed of x points per second. So far I have used Callstack lock as described in the docs:

observable.map(f -> { ratelimiter.acquire(); // configured limiter to only allow }); 

This works great, but just out of curiosity is there a better way to handle this with back pressure?

Tks

+6
source share
3 answers

You can try using rx.Observable#onBackpressureBuffer() in conjunction with a custom subscriber that will periodically request n points per second. But, you would have to diligently choose one second.

Note .subscribeOn() and .toBlocking() is only to make the main method not immediately.

 public class BackpressureTest { public static void main(final String[] args) { Observable.range(1, 1000) .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second .subscribeOn(Schedulers.computation()) .toBlocking() .subscribe(System.out::println); } private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { return upstream -> periodicallyRequestingSubscriber(upstream, n); } private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { return new Subscriber<T>() { @Override public void onStart() { request(0); // request 0 so that source stops emitting Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items } @Override public void onCompleted() { upstream.onCompleted(); } @Override public void onError(final Throwable e) { upstream.onError(e); } @Override public void onNext(final T integer) { upstream.onNext(integer); } }; } } 
+2
source

Use the sample statement (throttleLast):

 Observable<T> throttled = observable.sample(1 / rate, TimeUnit.MILLISECONDS); 

http://reactivex.io/documentation/operators/sample.html

https://github.com/ReactiveX/RxJava/wiki/Backpressure

+1
source

The answer from @michalsamek seems correct, although backpressure only works for Flowables. I adjusted his subscriber so that he would fulfill the requested requests.

There was also a slight problem when using it in packages at different times.

 private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); } Observable.range(1, 100) .observeOn(Schedulers.io()) .toFlowable(BackpressureStrategy.BUFFER) .compose(Flowable::onBackpressureBuffer) .lift(allowPerMillis(200)) .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { private final Subscriber<T> upstream; private final int millis; // If there hasn't been a request for a long time, do not flood private final AtomicBoolean shouldRequest = new AtomicBoolean(true); public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { this.upstream = upstream; this.millis = millis; } @Override public void onSubscribe(Subscription subscription) { Observable .interval(millis, TimeUnit.MILLISECONDS) .subscribe(x -> { if (shouldRequest.getAndSet(false)) subscription.request(1); }); } @Override public void onNext(T t) { shouldRequest.set(true); upstream.onNext(t); } @Override public void onError(Throwable throwable) { upstream.onError(throwable); } @Override public void onComplete() { upstream.onComplete(); } } 
0
source

Source: https://habr.com/ru/post/978193/


All Articles