Why ParallelQuery <T>. Where doesn’t work when converting to Observable?

I have an observable collection that I want to process in parallel, then I observe the processed values ​​when filtering, and finally subscribe to a handler that receives the filtered values.

My example is syntactically correct and compiles just fine, and when I run the code, the operator Wherethat performs the filtering is computed . But no data comes into the subscription. If I delete AsParallelso that the processing is performed on a regular basis IEnumerable, the data arrives and everything works as expected.

Here is my example doing some line-by-line processing:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

: TakeWhile, , , Where, , ParallelQuery :

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

, ToObservable, :

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

4 , 6 .

TakeWhile , Where - ?

, Visual Studio 2010 RC .Net 4.0 Framework Client Profile.

: @Sergeys answer Where. , :

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

- processedStrings , , .

+3
2

# 4.0 :


, PLINQ . Framework. , :

  • , TakeWhile, Skip SkipWhile
  • Select, SelectMany ElementAt

( , , , ). , ,


, TakeWhile .AsParallel(). , , , , AsParallel .

+2

TakeWhile Where, . , (. ). .WithExecutionMode(ParallelExecutionMode.ForceParallelism) TakeWhile, , .

, , ... , , , ? Select, , .

+2

Source: https://habr.com/ru/post/1733332/


All Articles