CombLatest emits only when one of the threads changes

I have a stream with frequent values ​​and one with slower ones. I want to combine them, but only emit a value when it emits more slowly. Therefore combineLatest does not work. For instance:

 a1 a2 b1 (a2,b1) a3 a4 a5 b2 (a5,b2) 

I am currently doing it as it should, is there a cleaner way?

 withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = Observable({ o => var last : A fast.subscribe({a => last = a}) slow.subscribe({b => o.onNext((last,b))}) }) 

edit . This statement is now located in Rx and is called withLatestFrom .

+6
source share
1 answer

What you are looking for is a combinator, which I called "combPrev", which is not in the API, but in many situations it is very necessary. The sample statement comes close, but it does not combine the two streams. I also missed "combPrev" in RxJS . It turns out that the implementation of "combPrev" ("withLatest") is simple and just depends on the map and switch:

 withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = { val hotSlow = slow.publish.refCount fast.map({a => hotSlow.map({b => (a,b)})}).switch } 

Here is a jsfiddle example of the same operator implemented in RxJS.

While the statement is not in Rx, you can use an implicit class to use slow.withLatest(fast) :

 implicit class RXwithLatest[B](slow: Observable[B]) { def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */ } 

Note: slow must be hot . If slow is cold Observed, withLatest does not work.

+4
source

Source: https://habr.com/ru/post/978868/


All Articles