How to use yield to return an Item collection to a parallel block or Task

I am looking for help on how to use the yield keyword to return IEnumberable to parallel blocks or Task block. Below is the pseudo code

public IEnumerable<List<T>> ReadFile( ) { foreach (string filepath in lstOfFiles) { var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); foreach (var item in ReadStream(stream)) yield return item; //where item is of type List<string> } } 

I want to convert the above code to a parallel block as shown below

 lstOfFiles.AsParallel() .ForAll(filepath => { var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); foreach (var item in ReadStream(Stream)) yield return item; }); 

but the compiler throws an error that cannot be used in parallel blocks or anonymous deletes. I also tried with the Task block, exit is not allowed in the task of an anonymous delegate

Each of us offers me the easiest and best way to get income for returning a collection of data in parallel blocks or tasks.

I read that RX 2.0 or TPL are good for use in the above scenario. I have a doubt about whether to use the RX or TPL library to return values ​​asynchronously. Can anyone suggest me which is better than Rx or TPL.

If I use Rx, I need to create a subscription and convert the parallel AsObservable block.

+5
source share
2 answers

It looks like you want to use SelectMany . You cannot use yield in an anonymous method, but you can split it into a new method, for example:

 IEnumerable<Item> items = lstOfFiles.AsParallel() .SelectMany(( filepath ) => ReadItems(filepath)); IEnumerable<Item> ReadItems(string filePath) { using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read)) { foreach (var item in ReadStream(Stream)) yield return item; } } 
0
source

To use Rx, you need to use IObservable<T> instead of IEnumerable<T> .

 public IObservable<T> ReadFiles() { return from filepath in lstOfFiles.ToObservable() from item in Observable.Using(() => File.OpenRead(filepath), ReadStream) select item; } 

Each time you call Subscribe on the observable returned by ReadFiles , it will ReadFiles over all the lines in lstOfFiles and, in parallel *, read each stream of files.

After that, the request opens each file stream and passes it to ReadStream , which is responsible for creating an asynchronous sequence of elements for this stream.

A ReadFiles request that uses the SelectMany operator, written in the syntax for understanding the request, combines each "element" that is generated by all ReadStream observers into one observable sequence, observing the source asynchrony.

You should consider writing an asynchronous iterator for your ReadStream method, as I showed here; otherwise, if you must return an IEnumerable<T> , you will have to convert it using the ToObservable(scheduler) operator with the concurrency scheduler, which is integrating, which may be less efficient.

 public IObservable<Item> ReadStream(Stream stream) { return Observable.Create<Item>(async (observer, cancel) => { // Here one example of reading a stream with fixed item lengths. var buffer = new byte[itemLength]; // TODO: Define itemLength var remainder = itemLength; int read; do { read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel) .ConfigureAwait(false); remainder -= read; if (read == 0) { if (remainder < itemLength) { throw new InvalidOperationException("End of stream unexpected."); } else { break; } } else if (remainder == 0) { observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem remainder = itemLength; } } while (true); }); } 

* Rx does not introduce concurrency here. Concurrency is simply the result of the asynchronous nature of the underlying API, so it is very efficient. Reading from a file stream asynchronously can cause Windows to use the I / O completion port as an optimization, notifying an empty stream when each buffer becomes available. This ensures that Windows is solely responsible for scheduling callbacks for your application, not TPL or on its own.

Rx is free, so each notification from your observer may be in a different combined stream; however, due to the Rx serialization agreement (§4.2 Rx Design Guidelines ), you will not receive overlapping notifications in your observer when you call Subscribe , so there is no need to provide explicit synchronization, such as locking.

However, due to the parallel nature of this request, you can observe alternating notifications for each file, but never overlap notifications.

If you prefer to receive all items for a given file at the same time, as you hinted at in your question, you can simply apply the ToList operator to the request and change the type of the returned value:

 public IObservable<IList<T>> ReadFiles() { return from filepath in lstOfFiles.ToObservable() from items in Observable.Using(() => File.OpenRead(filepath), ReadStream) .ToList() select items; } 

If you need stream-bound notifications (for example, in a GUI stream), you must march notifications because they will arrive in the combined stream. Since concurrency itself does not enter this request, the best way to achieve this is to use the ObserveOnDispatcher operator (WPF, Store Apps, Phone, Silverlight) or ObserveOn(SynchronizationContext) overload ObserveOn(SynchronizationContext) (WinForms, ASP.NET, etc.).). Do not forget to add a link to the appropriate NuGet package for a specific platform; e.g. Rx-Wpf, Rx-WinForms, Rx-WindowsStore, etc.

You may be tempted to convert the observable back to IEnumerable<T> instead of calling Subscribe . Do not do it. In most cases, this is optional, it can be ineffective and in the worst case, it can potentially cause deadlocks. When you enter the world of asynchrony, you should try to stay in it. This is true not only for Rx, but also for async/await .

+1
source

Source: https://habr.com/ru/post/1209969/


All Articles