In addition to Richard Salay's answer, I just learned the new Window statement from the latest version of rx. This "solves" the problem, since you can "buffer with a timeout", i.e. Retrieve the result for a time that lasts until a timeout is reached, but instead of retrieving the results as IEnumerable, you actually get them as IObservable.
Here is a brief example of what I mean:
private void SetupStream() { var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>( h => new MouseButtonEventHandler(h), h => MouseDown += h, h => MouseDown -= h); var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher)) .Switch(); inputStream.Window(() => timeout) .Subscribe(OnWindowOpen); } private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window) { Trace.WriteLine(string.Format("Window open")); var buffer = new List<IEvent<MouseButtonEventArgs>>(); window.Subscribe(click => { Trace.WriteLine(string.Format("Click")); buffer.Add(click); }, () => ProcessEvents(buffer)); } private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks) { Trace.WriteLine(string.Format("Window closed"));
Each time a window opens, you get all the events as they arrive, save them in the buffer and process them when the window finishes (which actually happens when you open the next window).
I'm not sure that Richard will change his example to use Window now available, but thought it might be worth the alternative.
source share