Possible reasons why ParallelQuery.Aggregate does not work in parallel

I would appreciate any help from PLYNQ experts! I'll take the time to review the answers, I have a more specific profile on .SE math.

I have an object of type ParallelQuery<List<string>> , in which there are 44 lists that I would like to process in parallel (five times, say). My process has a signature, for example

 private ProcessResult Process(List<string> input) 

Processing will return a result that is a pair of booleans, as shown below.

  private struct ProcessResult { public ProcessResult(bool initialised, bool successful) { ProcessInitialised = initialised; ProcessSuccessful = successful; } public bool ProcessInitialised { get; } public bool ProcessSuccessful { get; } } 

Problem. . When using IEnumerable<List<string>> processMe my PLYNQ query tries to implement this method: https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx , It is written as

 processMe.AsParallel() .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult> ( new ConcurrentStack<ProcessResult>, //aggregator seed (agg, input) => { //updating the aggregate result var res = Process(input); agg.Push(res); return agg; }, agg => { //obtain the result from the aggregator agg ProcessResult res; // (in this case just the most recent result**) agg.TryPop(out res); return res; } ); 

Unfortunately, it does not work in parallel, but only sequentially. (** note that this implementation does not make sense, I'm just trying to get parallelization to work at the moment.)


I tried a slightly different implementation that ran in parallel, but there was no aggregation. I defined an aggregation method (which is essentially a Boolean AND on both parts of the ProcessResult , that is, an aggregate ([A1, A2], [B1, B2]) ≡ [A1 & B1, A2 & B2]).

 private static ProcessResult AggregateProcessResults (ProcessResult aggregate, ProcessResult latest) { bool ini = false, suc = false; if (aggregate.ProcessInitialised && latest.ProcessInitialised) ini = true; if (aggregate.ProcessSuccessful && latest.ProcessSuccessful) suc = true; return new ProcessResult(ini, suc); } 

And used the PLYNQ request https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

 .Aggregate<List<string>, ProcessResult, ProcessResult>( new ProcessResult(true, true), (res, input) => Process(input), (agg, latest) => AggregateProcessResults(agg, latest), agg => agg 

The problem was that the AggregateProcessResults code never got in, for some reason - I don't know where the results were ...

Thanks for reading, any help appreciated :)

+5
source share
1 answer

The Aggregate overload that you use will not really be executed in parallel, by design. You pass the seed, then the step function, but the argument to the step function ( agg ) is the accumulator that was obtained from the previous step. For this reason, it is inherently consistent (the result of the previous step is introduced to the next step), and not parallelized. Not sure why this overload is included in ParallelEnumerable , but there was probably a reason.

Use a different overload instead:

 var result = processMe .AsParallel() .Aggregate ( // seed factory. Each partition will call this to get its own seed () => new ConcurrentStack<ProcessResult>(), // process element and update accumulator (agg, input) => { var res = Process(input); agg.Push(res); return agg; }, // combine accumulators from different partitions (agg1, agg2) => { agg1.PushRange(agg2.ToArray()); return agg1; }, // reduce agg => { ProcessResult res; agg.TryPop(out res); return res; } ); 
+2
source

Source: https://habr.com/ru/post/1273555/


All Articles