Why ParallelQuery <T>. Where doesn’t work when converting to Observable?
I have an observable collection that I want to process in parallel, then I observe the processed values when filtering, and finally subscribe to a handler that receives the filtered values.
My example is syntactically correct and compiles just fine, and when I run the code, the operator Wherethat performs the filtering is computed . But no data comes into the subscription. If I delete AsParallelso that the processing is performed on a regular basis IEnumerable, the data arrives and everything works as expected.
Here is my example doing some line-by-line processing:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
: TakeWhile, , , Where, , ParallelQuery :
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
, ToObservable, :
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
4 , 6 .
TakeWhile , Where - ?
, Visual Studio 2010 RC .Net 4.0 Framework Client Profile.
: @Sergeys answer Where. , :
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
- processedStrings , , .
# 4.0 :
, PLINQ . Framework. , :
- , TakeWhile, Skip SkipWhile
- Select, SelectMany ElementAt
( , , , ). , ,
, TakeWhile .AsParallel(). , , , , AsParallel .