Attempts to use PLINQ for long-term generators?

I have several endless generator methods, including some long and infinitely long generators.

IEnumerable<T> ExampleOne() { while(true) // this one blocks for a few seconds at a time yield return LongRunningFunction(); } IEnumerable<T> ExampleTwo() { while(true) //this one blocks for a really long time yield return OtherLongRunningFunction(); } 

My goal is to have an infinite sequence that combines elements from two examples. Here is what I tried using PLINQ:

  IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() } .AsParallel() .WithMergeOptions(ParallelMergeOptions.NotBuffered) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .SelectMany(source => source.GetRequests()); 

This seems to combine two IEnumerables in the new, with elements from IEnumerable # 1 and # 2 available when they appear in either of two IEnumerables sources:

 //assuming ExampleTwo yields TWO but happens roughly 5 times //less often then ExampleOne Example output: one one one one one TWO one one one one one one TWO 

However, sometimes it seems (usually after many hours of work) OtherLongRunningFunction() will last a long time without returning, and in conditions that are difficult to reproduce, combined will block on it, rather than continuing to return results from the first LongRunningFunction . It appears that although the combined parallel query started with two threads, he decided to switch to one thread later.

My first thought was "it's probably work for RX Observable.Merge , not PLINQ." But I would appreciate both answers that show the correct alternative ways to solve this situation, as well as explanations as to how PLINQ can change the degree of parallelism of the clock after the start of the request.

+6
source share
3 answers

Here's the Rx way to do this, and indeed, it uses Merge :

 IObservable<T> LongRunningFunction() { return Observable.Start(() => { // Calculate some stuff return blah; }, Scheduler.TaskPoolScheduler); } Observable.Merge( Observable.Defer(LongRunningFunction).Repeat(), Observable.Defer(OtherLongRunningFunction).Repeat(), ).Subscribe(x => { Console.WriteLine("An item: {0}", x); }); 
+2
source

If you want the benefits of TPL to be especially important for tasks with different loads (what happens when your subscription blocks and a few items are released - if you stop giving items?), I recommend TPL DataFlow .

If you want to do this with Rx, for really long computational tasks it is better not to block the thread pool:

 var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread)); stream.Subscribe(...); 
+1
source

Regarding the mechanics of PLINQ:

I am facing the same problem: I have a sequence whose elements require uneven processing time, some of which are longer in order. I feel dizzy from threads, much more reproducible on an 8-core processor than on a 4-core, although this can happen on a 4-core, as well as after many hours of processing. Some threads will work again after a while. Note that dynamic dialing is used, as in the example.

Observation: starvation most likely occurs at the conclusion of successive very long work items.

The MSDN topic Parallel Loops sheds some light:

Be careful if you use parallel loops with separate steps that take a few seconds or more. This can happen with workloads related to I / O, as well as lengthy calculations. If loops take a lot of time, you may encounter unlimited workflow growth due to heuristics to prevent the starvation used by Thread Thread thread logic. This heuristic is constantly increasing the number of workflows when the work items of the current pool work for long periods of time. The motivation is to add more threads in cases where everything in the thread pool is blocked. Unfortunately, if the work really continues, more threads may not necessarily be what you want. The .NET Framework cannot distinguish between these two situations.

I still don’t know the details, but I think that the basic ThreadPool heuristics are not very suitable for very long work items that are unable to allocate threads for subsequent iterations due to some upper limit being incorrectly adapted, iterations in the queue. I do not have Visual Studio access to an 8-core computer, where the problem is easier to reproduce. I have not yet been able to reproduce the problem when debugging Visual Studio on a 4-core computer. The investigation is ongoing.

For more information, "Does the parallel task library (or PLINQ) consider other processes?" The topic is very important.

+1
source

Source: https://habr.com/ru/post/906824/


All Articles