I had the same question some time ago, and I did not find a built-in operator that does just that. So I wrote my own, which I called Latest
. It is not trivial to implement, but found it to be very useful in my current project.
It works as follows: while the observer is busy processing the previous notification (on his own thread, of course), he queues the last up to n notifications (n> = 0) and the OnNext
observer as soon as he is idle. So:
Latest(0)
: watch only items arriving during user downtimeLatest(1)
: always keep the latestLatest(1000)
(for example): Usually handle all the elements, but if something is stuck on the line, skip rather than get an OutOfMemoryException
Latest(int.MaxValue)
: never miss an element, but a load balance between producer and consumer.
Thus, your code will look like this: stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")
The signature is as follows:
/// <summary> /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. /// </summary> /// <param name="source">The source sequence.</param> /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> /// <remarks> /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. /// </remarks> public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
The implementation is too large to publish here, but if anyone is interested, I would gladly share it. Let me know.
source share