The question is not 100% clear, so I am making some presumptions.
Observable.Delay
is not what you want, because it will create a delay from the moment each event arrives, and will not even create time intervals for processing.
Observable.Buffer
not what you want, because it will cause all events in each given interval to be transmitted to you, and not one at a time.
So, I believe that you are looking for a solution that creates some kind of metronome that goes out and gives you an event beyond the mark. This may be naive , built using Observable.Interval
for the metronome and Zip
to connect it to your source:
var source = GetInitSequence(); var trigger = Observable.Interval(TimeSpan.FromSeconds(5)); var triggeredSource = source.Zip(trigger, (s,_) => s); triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));
This will run every 5 seconds (in the above example) and you will get the original elements in sequence.
The only problem with this solution is that if you no longer have the source elements (say) 10 seconds when the source elements arrive, they will be sent immediately, since some of the trigger events sit there waiting for them. Marble diagram for this scenario:
source: -abc
This is a very reasonable problem. There are already two questions here:
Rx IObservable Buffering for Smoothing Event Bursts
Method for uniformly shifting buffered events
The provided solution is the main Drain
extension method and the Buffered
extension. I changed them a lot easier (no need to Drain
, just use Concat
). Using:
var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));
StepInterval
Extension StepInterval
:
public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) { return source.Select(x => Observable.Empty<T>() .Delay(minDelay) .StartWith(x) ).Concat(); }