Immediate rejection in Rx

I am looking for an operator to debounce series of events, say, a user click. Entrance and exit should be like this:

 interval : -> <- -> <- in : 1--2--3-------4--5--5--6-7-8-------- out : 1-------------4--------------------- 

The idea is similar to underlining with the immediate on http://underscorejs.org/#debounce option. The operator can be represented / implemented in any language that supports Reactive Extensions.

Change: specify the interval, say, 5 seconds (5 spaces between two arrows): -> <-

Edit2: a more understandable version: I have a user, he repeatedly presses the button (1, 2, 3); I want to catch the first click (1) and ignore the rest. After a while, he gets tired and rests for 7 seconds (which is more than a 5-second interval between two arrows) and I press the button again (4, 5, 6, 7, 8). I want to catch the first click (4) and ignore everything else.

If he clicks after the fourth arrow, I also want to catch this click.

Edit3: here is the image image which can be found in the original article

+14
source share
4 answers

Edit: Based on the explanations, RxJava does not have an operator for this type of stream, but can be composed of a nontrivial set of other operators:

 import java.util.concurrent.TimeUnit; import rx.Observable; public class DebounceFirst { public static void main(String[] args) { Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v)) .doOnNext(v -> System.out.println("T=" + v)) .compose(debounceFirst(500, TimeUnit.MILLISECONDS)) .toBlocking() .subscribe(v -> System.out.println("Debounced: " + v)); } static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) { return f -> f.publish(g -> g.take(1) .concatWith( g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u)) .take(1) .ignoreElements() ) .repeatWhen(h -> h.takeUntil(g.ignoreElements())) ); } } 
+13
source

The behavior you need is not what the debounce statement in Rx does.

This is called throttle , throttleTime or throttleWithTimeout (however, it falls into the category of debounce statements). I don’t know which language you are using, but in RxJS it looks like this:

enter image description here

See http://reactivex.io/documentation/operators/debounce.html .

+7
source

Since debounce() is essentially asynchronous, you need to explicitly return the result to the current thread.

 seriesOfUnfortunateEvents .debounce( 14, TimeUnit.MILLISECONDS ) .observeOn( Schedulers.immediate() ) .subscribe( v -> yourStuff() ); 
+1
source

According to the documentation , RxJS has two debounce statements. You may be interested in, in particular, debounceTime .

debounceTime

From the documentation

Emits a value from a source Observed only after a certain period of time has passed without another source emitting.

Example:

 Rx.Observable .fromEvent(document.querySelector('button'), 'click') .debounceTime(200) .mapTo(() => 'clicked!') .subscribe(v => console.log(v)); 

It will come out with one click! if the button was pressed at the set time (200 ms in this example).

debounce

From the documentation

Gives value from the source. The range observed by the other Observer observed only after a certain time passes without another source radiation.

+1
source

Source: https://habr.com/ru/post/1263629/


All Articles