Rx Extensions: Where is Parallel.ForEach?

I have a piece of code that uses Parallel.ForEach , possibly based on an old version of Rx extensions or Parallel Library. I installed the current version of Rx extensions, but cannot find Parallel.ForEach . I do not use any other fancy materials in the library and just want to process some data in parallel:

 Parallel.ForEach(records, ProcessRecord); 

I found this question , but I would not want to depend on older versions of Rx. But I could not find something similar for Rx, so what is the current and most direct way to do this using the current version of Rx? The project uses .NET 3.5.

+6
source share
2 answers

No need to do all this silly goosery if you have Rx:

 records.ToObservable() .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) .ToList() .First(); 

(Or, if you want the order of the elements to be maintained at the expense of efficiency):

 records.ToObservable() .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) .Concat() .ToList() .First(); 

Or if you want to limit the number of elements at a time:

 records.ToObservable() .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))) .Merge(5 /* at a time */) .ToList() .First(); 
+24
source

Here's a simple replacement:

 class Parallel { public static void ForEach<T>(IEnumerable<T> source, Action<T> body) { if (source == null) { throw new ArgumentNullException("source"); } if (body == null) { throw new ArgumentNullException("body"); } var items = new List<T>(source); var countdown = new CountdownEvent(items.Count); WaitCallback callback = state => { try { body((T)state); } finally { countdown.Signal(); } }; foreach (var item in items) { ThreadPool.QueueUserWorkItem(callback, item); } countdown.Wait(); } } 
+1
source

Source: https://habr.com/ru/post/904063/


All Articles