Decision
public static IObservable<TR> Sample<TSource, TSampler, TR> (this IObservable<TSource> source, IObservable<TSampler> sampler, Func<TSource, TSampler, TR> combiner) { return source.Publish (rs => sampler .Zip ( rs.MostRecent(default(TSource)) , (samplerElement, sourceElement) => combiner(sourceElement, samplerElement) ) .SkipUntil(rs) ); }
with the test case, because this thing is hard to understand.
public class SampleSpec : ReactiveTest { TestScheduler _Scheduler = new TestScheduler(); [Fact] public void ShouldWork() { var sampler = _Scheduler.CreateColdObservable ( OnNext(10, "A") , OnNext(20, "B") , OnNext(30, "C") , OnNext(40, "D") , OnNext(50, "E") , OnNext(60, "F") ); var source = _Scheduler.CreateColdObservable ( Enumerable .Range(5,100) .Where(i=>i%10!=0) .Select(i=>OnNext(i,i)).ToArray()); var sampled = source.Sample (sampler, Tuple.Create); var actual = _Scheduler.Start (() => sampled , created: 0 , subscribed: 1 , disposed: 1000); actual.Messages.Count() .Should() .Be(6); var messages = actual.Messages.Take(6) .Select(v => v.Value.Value) .ToList(); messages[0].Should().Be(Tuple.Create(9,"A")); messages[1].Should().Be(Tuple.Create(19,"B")); messages[2].Should().Be(Tuple.Create(29, "C")); messages[3].Should().Be(Tuple.Create(39, "D")); messages[4].Should().Be(Tuple.Create(49, "E")); messages[5].Should().Be(Tuple.Create(59, "F")); } }
source share