In my application, I have three classes: Extractor , Transformer and Loader , which are coordinated by the fourth class, Coordinator . Extractor , Transformer and Loader very simple and do the following:
Extractor
Returns a member named Results type IEnumerable<string> , for example, by reading from a text file. Extraction must be synchronous.
Transformer
Gives a member named Transform , which takes a string and converts it to another string through some process that is expected to be time-consuming (use parallel processing here).
Loader
Gives a member named Load , which takes a string and loads it into some final form (for example, another text file). Download must be synchronous.
The Coordinator classes coordinate three operations. The conversion process must be performed in parallel, and then output the results to a queue that is read by the loader. Coordinator Run() method is as follows:
Extractor extractor = new Extractor(); Transformer transformer = new Transformer(); Loader loader = new Loader(); ConcurrentQueue<string> outputs = new ConcurrentQueue<string>(); Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x))); foreach(string output in outputs) { loader.Load(output); }
This works well, except that the ALL conversion must be completed before any download is completed, i.e. completing Parallel.ForEach() before starting the next foreach . I would prefer that each output be passed to the bootloader as soon as it is ready.
I also tried this:
Extractor extractor = new Extractor(); Transformer transformer = new Transformer(); Loader loader = new Loader(); ConcurrentQueue<string> outputs = new ConcurrentQueue<string>(); foreach (string input in extractor.Results) { string input1 = input; Task task = Task.Factory.StartNew( () => outputs.Enqueue(transformer.Transform(input1))); } foreach(string output in outputs) { loader.Load(output); }
But then the foreach below gets caught before any outputs are added to the queue, and so it just exits.
How to get loading as soon as the results are available from calls to transformer.Transform() ?