How to publish Parallel.ForEach results in a queue that is constantly read in C #

In my application, I have three classes: Extractor , Transformer and Loader , which are coordinated by the fourth class, Coordinator . Extractor , Transformer and Loader very simple and do the following:

Extractor

Returns a member named Results type IEnumerable<string> , for example, by reading from a text file. Extraction must be synchronous.

Transformer

Gives a member named Transform , which takes a string and converts it to another string through some process that is expected to be time-consuming (use parallel processing here).

Loader

Gives a member named Load , which takes a string and loads it into some final form (for example, another text file). Download must be synchronous.

The Coordinator classes coordinate three operations. The conversion process must be performed in parallel, and then output the results to a queue that is read by the loader. Coordinator Run() method is as follows:

 Extractor extractor = new Extractor(); Transformer transformer = new Transformer(); Loader loader = new Loader(); ConcurrentQueue<string> outputs = new ConcurrentQueue<string>(); Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x))); foreach(string output in outputs) { loader.Load(output); } 

This works well, except that the ALL conversion must be completed before any download is completed, i.e. completing Parallel.ForEach() before starting the next foreach . I would prefer that each output be passed to the bootloader as soon as it is ready.

I also tried this:

 Extractor extractor = new Extractor(); Transformer transformer = new Transformer(); Loader loader = new Loader(); ConcurrentQueue<string> outputs = new ConcurrentQueue<string>(); foreach (string input in extractor.Results) { string input1 = input; Task task = Task.Factory.StartNew( () => outputs.Enqueue(transformer.Transform(input1))); } foreach(string output in outputs) { loader.Load(output); } 

But then the foreach below gets caught before any outputs are added to the queue, and so it just exits.

How to get loading as soon as the results are available from calls to transformer.Transform() ?

+4
source share
1 answer

Try BlockingCollection instead of Parallel.Invoke . In the example below, GetConsumingEnumerable (the consumer part of the Producer-Consumer template) will not end until CompleteAdding is called so load will be executed until fill completed.

 var outputs = new BlockingCollection<string>(); // aka Producer Action fill = () => { Parallel.ForEach(extractor.Results, x => outputs.Add(transformer.Transform(x))); outputs.CompleteAdding(); }; // aka Consumer Action load = () => { foreach(var o in outputs.GetConsumingEnumerable()) loader.Load(o); } Parallel.Invoke(fill, load); 
+6
source

Source: https://habr.com/ru/post/1437622/


All Articles