There are a number of things that you ultimately raised in your code that are worth mentioning.
To begin with, the .ToArray() operator takes an observable value that returns zero or more single values and changes it to an observable that returns one array of zero or more values. Such an observable must end before it can regain its sole meaning.
Given this, the results of the first query should make sense.
Your second request is with x.Split(',').Select(s => s.ToUpper()).ToArray() creates the result you need, but you wanted to know "is there a way using RX to solve this problem without putting cleavage and processing in the same action. "
Well, trivial, yes:
var result = new List<string[]>(); observable .Select(x => x.Split(',')) .Select(x => x.Select(s => s.ToUpper())) .Select(x => x.ToArray()) .Do(s => result.Add(s)) .Subscribe();
However, this does not process elements in parallel. Rx is designed to work sequentially unless you invoke an operation that introduces parallelism.
Often an easy way to do this is to make a long choice, for example .Select(x => longRunningOperation(x)) , and do it with it:
.SelectMany(x => Observable.Start(() => longRunningOperation(x)))
In your case, you can start with this:
observable .ObserveOn(Scheduler.Default) .SelectMany(x => Observable.Start(() => x.Split(','))) .SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper()))) .SelectMany(x => Observable.Start(() => x.ToArray())) .Do(s => result.Add(s)) .Subscribe();
But this is only parallelization of each original .OnNext call, and not processing inside. To do this, you need to turn the result of x.Split(',') into observable and process it in parallel.
observable .SelectMany(x => Observable.Start(() => x.Split(',').ToObservable())) .SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper())))) .SelectMany(x => Observable.Start(() => x.ToArray())) .Do(s => s.Do(t => result.Add(t))) .Merge() .Subscribe();
But it starts to look crazy and no longer works on the current thread, which means that your test is not waiting for results.
Let's rename this query.
I started by getting rid of the .Do call. They are usually good for debugging, but for any state changes they are bad. They can work anywhere in any thread in the request, so you need to make sure that your code in the .Do call is thread safe and the result.Add(s) call is NOT thread safe.
I also introduced the "webservice" call to replace .ToUpper() with one processing delay, so that we can see how long the request takes to process, and therefore know whether it works in parallel or not. If the final request takes 5 seconds, then there is no paralysis, and if it is less, we win.
So, if I write a query in the easiest way, it will look like this:
Func<string, string> webservice = x => { Thread.Sleep(1000); return x.ToUpper(); }; var query = observable .Select(ls => from p in ls.Split(',') select webservice(p)) .Select(rs => rs.ToArray()) .ToArray() .Select(rss => new List<string[]>(rss)); var sw = Stopwatch.StartNew(); List<string[]> result = query.Wait(); sw.Stop();
When I run this, I get the expected results {{"A","B"},{"C","D","E"}} , but it only takes more than 5 seconds to complete. No parallelism here as expected.
Now let's introduce some parallelism:
var query = observable .Select(ls => from p in ls.Split(',').ToObservable() from r in Observable.Start(() => webservice(p)) select r) .Select(rs => rs.ToArray()) .Merge() .ToArray() .Select(rss => new List<string[]>(rss));
I basically applied the " Select to SelectMany / Start " pattern that I described above. The only tricky part was that .Select(rs => rs.ToArray()) switched from IObservable<string[]> to IObservable<IObservable<string[]>> , so I jumped into .Merge() to smooth his. This is normal when you inject parallelism into Rx requests.
Now when I run the query - BOOM - for a little over one second. All five inputs work in parallel. The only problem now is that order is no longer decisive. But you cannot help when the results are executed in parallel.
One such run I got this result:

If I were to do this as a test, I would sort the results in a known order and compare them with the expected result.