I have several endless generator methods, including some long and infinitely long generators.
IEnumerable<T> ExampleOne() { while(true) // this one blocks for a few seconds at a time yield return LongRunningFunction(); } IEnumerable<T> ExampleTwo() { while(true) //this one blocks for a really long time yield return OtherLongRunningFunction(); }
My goal is to have an infinite sequence that combines elements from two examples. Here is what I tried using PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() } .AsParallel() .WithMergeOptions(ParallelMergeOptions.NotBuffered) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .SelectMany(source => source.GetRequests());
This seems to combine two IEnumerables in the new, with elements from IEnumerable # 1 and # 2 available when they appear in either of two IEnumerables sources:
//assuming ExampleTwo yields TWO but happens roughly 5 times //less often then ExampleOne Example output: one one one one one TWO one one one one one one TWO
However, sometimes it seems (usually after many hours of work) OtherLongRunningFunction() will last a long time without returning, and in conditions that are difficult to reproduce, combined will block on it, rather than continuing to return results from the first LongRunningFunction . It appears that although the combined parallel query started with two threads, he decided to switch to one thread later.
My first thought was "it's probably work for RX Observable.Merge , not PLINQ." But I would appreciate both answers that show the correct alternative ways to solve this situation, as well as explanations as to how PLINQ can change the degree of parallelism of the clock after the start of the request.
source share