To use Rx, you need to use IObservable<T> instead of IEnumerable<T> .
public IObservable<T> ReadFiles() { return from filepath in lstOfFiles.ToObservable() from item in Observable.Using(() => File.OpenRead(filepath), ReadStream) select item; }
Each time you call Subscribe on the observable returned by ReadFiles , it will ReadFiles over all the lines in lstOfFiles and, in parallel *, read each stream of files.
After that, the request opens each file stream and passes it to ReadStream , which is responsible for creating an asynchronous sequence of elements for this stream.
A ReadFiles request that uses the SelectMany operator, written in the syntax for understanding the request, combines each "element" that is generated by all ReadStream observers into one observable sequence, observing the source asynchrony.
You should consider writing an asynchronous iterator for your ReadStream method, as I showed here; otherwise, if you must return an IEnumerable<T> , you will have to convert it using the ToObservable(scheduler) operator with the concurrency scheduler, which is integrating, which may be less efficient.
public IObservable<Item> ReadStream(Stream stream) { return Observable.Create<Item>(async (observer, cancel) => { // Here one example of reading a stream with fixed item lengths. var buffer = new byte[itemLength]; // TODO: Define itemLength var remainder = itemLength; int read; do { read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel) .ConfigureAwait(false); remainder -= read; if (read == 0) { if (remainder < itemLength) { throw new InvalidOperationException("End of stream unexpected."); } else { break; } } else if (remainder == 0) { observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem remainder = itemLength; } } while (true); }); }
* Rx does not introduce concurrency here. Concurrency is simply the result of the asynchronous nature of the underlying API, so it is very efficient. Reading from a file stream asynchronously can cause Windows to use the I / O completion port as an optimization, notifying an empty stream when each buffer becomes available. This ensures that Windows is solely responsible for scheduling callbacks for your application, not TPL or on its own.
Rx is free, so each notification from your observer may be in a different combined stream; however, due to the Rx serialization agreement (§4.2 Rx Design Guidelines ), you will not receive overlapping notifications in your observer when you call Subscribe , so there is no need to provide explicit synchronization, such as locking.
However, due to the parallel nature of this request, you can observe alternating notifications for each file, but never overlap notifications.
If you prefer to receive all items for a given file at the same time, as you hinted at in your question, you can simply apply the ToList operator to the request and change the type of the returned value:
public IObservable<IList<T>> ReadFiles() { return from filepath in lstOfFiles.ToObservable() from items in Observable.Using(() => File.OpenRead(filepath), ReadStream) .ToList() select items; }
If you need stream-bound notifications (for example, in a GUI stream), you must march notifications because they will arrive in the combined stream. Since concurrency itself does not enter this request, the best way to achieve this is to use the ObserveOnDispatcher operator (WPF, Store Apps, Phone, Silverlight) or ObserveOn(SynchronizationContext) overload ObserveOn(SynchronizationContext) (WinForms, ASP.NET, etc.).). Do not forget to add a link to the appropriate NuGet package for a specific platform; e.g. Rx-Wpf, Rx-WinForms, Rx-WindowsStore, etc.
You may be tempted to convert the observable back to IEnumerable<T> instead of calling Subscribe . Do not do it. In most cases, this is optional, it can be ineffective and in the worst case, it can potentially cause deadlocks. When you enter the world of asynchrony, you should try to stay in it. This is true not only for Rx, but also for async/await .