Pause and resume observable based on logic gates in RxJava 2.X?

Say I have a processor that emits a boolean when a button is pressed, think of it as a switch.

boolean gateValue = true; PublishProcessor<Boolean> gate = PublishProcessor.create(); view.onButtonClicked() .subscribe(new Action1<Void>() { @Override public void call(final Void aVoid) { gate.onNext(gateValue = !gateValue); } })); 

What I would like to do is use the shutter value to pause and resume the observed sequence, buffering the emitted values ​​during pause.

I read a lot about this, and although this seems possible in reactive extensions for other languages, RxJava does not seem to support it.

Here's an example of what I would like to achieve, it just outputs an incremental value every second. When I press the button, I want the output to stop until I click on it again, which should output every element emitted between two button presses:

 Flowable.interval(1, TimeUnit.SECONDS) .bufferWhile(gate) .flatMapIterable(longs -> longs) .subscribe(new Consumer<Long>() { @Override public void accept(final Long aLong) throws Exception { view.displayTime(aLong); } }); 

Does anyone know a way to achieve something like this?

Edit I wrote a blog post on how to achieve this https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

+5
source share
3 answers

The RxJava2Extensions library now has a valve() statement that executes the requested behavior.

+4
source

Just use the ready-made Observable.window operator, which takes a single Observable<T> parameter.

0
source

You can achieve this using the standard RxJava operators:

  final Random random = new Random(); final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS) .map(it -> random.nextBoolean()) .startWith(false) .doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed"))) .take(100); final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS) .doOnNext(it -> System.out.println("New value " + it)) .take(100); gates.publish(innerGates -> data.publish(innerData -> Observable.merge( innerData.window( innerGates.distinctUntilChanged().filter(it -> it), (ignore -> innerGates.distinctUntilChanged().filter(it -> !it)) ).flatMap(it -> it), innerData.buffer( innerGates.distinctUntilChanged().filter(it -> !it), (ignore -> innerGates.distinctUntilChanged().filter(it -> it)) ) .flatMap(Observable::from) ))) .subscribe(it -> System.out.println("On next " + it)); 
0
source

Source: https://habr.com/ru/post/1259780/


All Articles