Is there an Rx operator to combine the last of streams 1 and 2 only when stream 2 emits things?

Here is my attempt to draw a marble diagram -

STREAM 1 = A----B----C---------D------> (magical operator) STREAM 2 = 1----------2-----3-----4---> STREAM 3 = 1A---------2C----3C----4D-->

Basically I am looking for something that generates stream 3 from streams 1 and 2. Basically, whenever something is emitted from stream 2, it combines it with the last one from stream 1. combineLatest is like what I want, but I just want things emitted from stream 3 when something is emitted from stream 2, not stream 1. Does such an operator exist?

+5
source share
6 answers

withLatestFrom seems to match exactly what I was looking for - http://rxmarbles.com/#withLatestFrom

+2
source

There is an operator that does what you need: one sample overload takes as a parameter another observable instead of duration. The documentation is here: https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#sample-or-throttlelast

Usage (I will give examples in scala):

 import rx.lang.scala.Observable import scala.concurrent.duration import duration._ def o = Observable.interval(100.milli) def sampler = Observable.interval(180.milli) // Often, you just need the sampled observable o.sample(sampler).take(10).subscribe(x ⇒ println(x + ", ")) Thread.sleep(2000) // or, as for your use case o.combineLatest(sampler).sample(sampler).take(10).subscribe(x ⇒ println(x + ", ")) Thread.sleep(2000) 

Exit:

 0, 2, 4, 6, 7, 9, 11, 13, 15, 16, (2,0), (4,1), (6,2), (7,3), (9,4), (11,5), (13,6), (15,7), (16,8), (18,9), 

There is a small catch in that duplicate entries from a sample observable are swallowed (see discussion at https://github.com/ReactiveX/RxJava/issues/912 ). Other than that, I think this is exactly what you are looking for.

+5
source

As far as I know, there is not a single existing operator that will do what you want. However, you can compose one using CombineLatest and DistinctUntilChanged as follows:

 var joined = Observable.CombineLatest(sourceA, sourceB, (a,b) => new { A = a, B = b }) .DistinctUntilChanged(pair => pair.B); 

EDIT:

The above will work as long as the STREAM 1 values ​​change each time. If they do not, then use the following, which is less clear, but works in all situations (which I tested anyway).

 var joined = Observable.Join( sourceB, sourceA, _ => Observable.Return(Unit.Default), _ => sourceA, (a, b) => new { A = a, B = b }); 

The Join statement is never intuitive to me, the best explanation I've found is here .

In response to @Matthew's comment

 var buttonClicks = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseLeftButtonDown") .Select(_ => Unit.Default); var sequence = Observable.Interval(TimeSpan.FromSeconds(1)); var joined = Observable.Join( buttonClicks, sequence, _ => Observable.Return(Unit.Default), _ => sequence, (b, s) => s); // No info in button click here 
+3
source

Here is a pretty simple way to do this:

 var query = stream2.Zip( stream1.MostRecent(' '), (s2,s1) => string.Format("{0}{1}", s2, s1)); 

MostRecent may be provided with a value of "null", which is used in event stream 1, not yet issued. It may be null for reference types, but I used char for stream1, so I provided a space.

+3
source

I think the Switch statement is used here.

Try the following:

 var query = stream1 .Select(s1 => stream2.Select(s2 => new { s1, s2 })) .Switch(); 

The following test code:

 query .Select(s => String.Format("{0}{1}", s.s2, s.s1)) .Subscribe(Console.WriteLine); stream1.OnNext('A'); stream2.OnNext(1); stream1.OnNext('B'); stream1.OnNext('C'); stream2.OnNext(2); stream2.OnNext(3); stream1.OnNext('D'); stream2.OnNext(4); 

It gives the following results:

 1A 2C 3C 4D 

Please let me know if this is correct.

0
source

Decision

  public static IObservable<TR> Sample<TSource, TSampler, TR> (this IObservable<TSource> source, IObservable<TSampler> sampler, Func<TSource, TSampler, TR> combiner) { return source.Publish (rs => sampler .Zip ( rs.MostRecent(default(TSource)) , (samplerElement, sourceElement) => combiner(sourceElement, samplerElement) ) .SkipUntil(rs) ); } 

with the test case, because this thing is hard to understand.

 public class SampleSpec : ReactiveTest { TestScheduler _Scheduler = new TestScheduler(); [Fact] public void ShouldWork() { var sampler = _Scheduler.CreateColdObservable ( OnNext(10, "A") , OnNext(20, "B") , OnNext(30, "C") , OnNext(40, "D") , OnNext(50, "E") , OnNext(60, "F") ); var source = _Scheduler.CreateColdObservable ( Enumerable .Range(5,100) .Where(i=>i%10!=0) .Select(i=>OnNext(i,i)).ToArray()); var sampled = source.Sample (sampler, Tuple.Create); var actual = _Scheduler.Start (() => sampled , created: 0 , subscribed: 1 , disposed: 1000); actual.Messages.Count() .Should() .Be(6); var messages = actual.Messages.Take(6) .Select(v => v.Value.Value) .ToList(); messages[0].Should().Be(Tuple.Create(9,"A")); messages[1].Should().Be(Tuple.Create(19,"B")); messages[2].Should().Be(Tuple.Create(29, "C")); messages[3].Should().Be(Tuple.Create(39, "D")); messages[4].Should().Be(Tuple.Create(49, "E")); messages[5].Should().Be(Tuple.Create(59, "F")); } } 
0
source

Source: https://habr.com/ru/post/1200635/


All Articles