Reactive Extensions: Separating Input, Processing, and Concatenation

Basically, I have observable input lines that I want to process individually and then do something with the result. If the input string contains commas (like a separator), I want to split the string and process each substring individually, and then do something with each string sequence. Below is a snippet below of a simplified version of what I'm trying to do:

[Fact] public void UniTest1() { var observable = new ReplaySubject<string>(); observable.OnNext("a,b"); observable.OnNext("c,d,e"); observable.OnCompleted(); var result = new List<string[]>(); observable .SelectMany(x => x.Split(',')) .Select(x => x.ToUpper()) .ToArray() // How to collect an IEnumerable for each item here? .Do(s => result.Add(s)) .Subscribe(); // Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}} Assert.Equal(2, result.Count); Assert.Equal("A", result[0][0]); Assert.Equal("B", result[0][1]); Assert.Equal("C", result[1][0]); Assert.Equal("D", result[1][1]); Assert.Equal("E", result[1][2]); } 

As explained in the commentary, the above does NOT work .. ToArray () - the call combines all observables into one sequence.

However, I solved this by putting splitting and processing in one action, as such:

 [Fact] public void UniTest2() { var observable = new ReplaySubject<string>(); observable.OnNext("a,b"); observable.OnNext("c,d,e"); observable.OnCompleted(); var result = new List<string[]>(); observable .Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray()) .Do(s => result.Add(s)) .Subscribe(); // Result is as expected: {{"A","B"},{"C","D","E"}} Assert.Equal(2, result.Count); Assert.Equal("A", result[0][0]); Assert.Equal("B", result[0][1]); Assert.Equal("C", result[1][0]); Assert.Equal("D", result[1][1]); Assert.Equal("E", result[1][2]); } 

But is there a way, using Rx, to solve this problem, NOT putting splitting and processing in the same action? What is the recommended solution to this problem?

It should also be mentioned that processing, that is, a call to Topper (), is actually a web service call. I used ToUpper () in my examples so that my problem is easily explainable. But that means that I want this processing to be done in parallel and not blocked.

+5
source share
2 answers

There are a number of things that you ultimately raised in your code that are worth mentioning.

To begin with, the .ToArray() operator takes an observable value that returns zero or more single values ​​and changes it to an observable that returns one array of zero or more values. Such an observable must end before it can regain its sole meaning.

Given this, the results of the first query should make sense.

Your second request is with x.Split(',').Select(s => s.ToUpper()).ToArray() creates the result you need, but you wanted to know "is there a way using RX to solve this problem without putting cleavage and processing in the same action. "

Well, trivial, yes:

 var result = new List<string[]>(); observable .Select(x => x.Split(',')) .Select(x => x.Select(s => s.ToUpper())) .Select(x => x.ToArray()) .Do(s => result.Add(s)) .Subscribe(); 

However, this does not process elements in parallel. Rx is designed to work sequentially unless you invoke an operation that introduces parallelism.

Often an easy way to do this is to make a long choice, for example .Select(x => longRunningOperation(x)) , and do it with it:

 .SelectMany(x => Observable.Start(() => longRunningOperation(x))) 

In your case, you can start with this:

 observable .ObserveOn(Scheduler.Default) .SelectMany(x => Observable.Start(() => x.Split(','))) .SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper()))) .SelectMany(x => Observable.Start(() => x.ToArray())) .Do(s => result.Add(s)) .Subscribe(); 

But this is only parallelization of each original .OnNext call, and not processing inside. To do this, you need to turn the result of x.Split(',') into observable and process it in parallel.

  observable .SelectMany(x => Observable.Start(() => x.Split(',').ToObservable())) .SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper())))) .SelectMany(x => Observable.Start(() => x.ToArray())) .Do(s => s.Do(t => result.Add(t))) .Merge() .Subscribe(); 

But it starts to look crazy and no longer works on the current thread, which means that your test is not waiting for results.

Let's rename this query.

I started by getting rid of the .Do call. They are usually good for debugging, but for any state changes they are bad. They can work anywhere in any thread in the request, so you need to make sure that your code in the .Do call is thread safe and the result.Add(s) call is NOT thread safe.

I also introduced the "webservice" call to replace .ToUpper() with one processing delay, so that we can see how long the request takes to process, and therefore know whether it works in parallel or not. If the final request takes 5 seconds, then there is no paralysis, and if it is less, we win.

So, if I write a query in the easiest way, it will look like this:

 Func<string, string> webservice = x => { Thread.Sleep(1000); return x.ToUpper(); }; var query = observable .Select(ls => from p in ls.Split(',') select webservice(p)) .Select(rs => rs.ToArray()) .ToArray() .Select(rss => new List<string[]>(rss)); var sw = Stopwatch.StartNew(); List<string[]> result = query.Wait(); sw.Stop(); 

When I run this, I get the expected results {{"A","B"},{"C","D","E"}} , but it only takes more than 5 seconds to complete. No parallelism here as expected.

Now let's introduce some parallelism:

 var query = observable .Select(ls => from p in ls.Split(',').ToObservable() from r in Observable.Start(() => webservice(p)) select r) .Select(rs => rs.ToArray()) .Merge() .ToArray() .Select(rss => new List<string[]>(rss)); 

I basically applied the " Select to SelectMany / Start " pattern that I described above. The only tricky part was that .Select(rs => rs.ToArray()) switched from IObservable<string[]> to IObservable<IObservable<string[]>> , so I jumped into .Merge() to smooth his. This is normal when you inject parallelism into Rx requests.

Now when I run the query - BOOM - for a little over one second. All five inputs work in parallel. The only problem now is that order is no longer decisive. But you cannot help when the results are executed in parallel.

One such run I got this result:

results

If I were to do this as a test, I would sort the results in a known order and compare them with the expected result.

+3
source

If I understand correctly, you want to keep the original arrays. However, after SelectMany you smooth arrays into single values ​​directly in the stream, so you can no longer change them into separate arrays. The trick is moving ToUpper and ToArray inside SelectMany .

Also ToUpper not a function of async. This is important, otherwise you won’t get any parallelism (I assume it is in your real code, but it makes ToUpper poor replacement.). Instead, I will use Observable.Timer . If your web service call is no longer observable, you need to convert it, but that is another question and a bit out of scope.

This means that your results can be disabled.

 new string[] { "a,b", "c,d,e" }.ToObservable() .SelectMany(str => str.Split(',') .ToObservable() .SelectMany(x => Observable.Timer(DateTime.Now.AddSeconds(2)) .Select(_ => x.ToUpper())) .ToArray()) .Subscribe(arr => { Console.WriteLine(string.Join(",", arr)); }); 

Some other things that I noticed in your code:

  .Do(s => result.Add(s)) .Subscribe(); 

You can put result.Add(s) directly in Subscribe

  .Subscribe(s => result.Add(s)); 

In fact, if you are writing test files, use testScheduler and results.Messages.AssertEqual .

 using Microsoft.Reactive.Testing; using NUnit.Framework; using System; using System.Reactive.Linq; namespace test { [TestFixture] public class UnitTests : ReactiveTest { [Test] public void UniTest1() { var testScheduler = new TestScheduler(); var source = new string[] { "a,b", "c,d,e" }.ToObservable(); var results = testScheduler.Start( () => source.SelectMany(str => str.Split(',') .ToObservable() .Select(x => x.ToUpper()) .ToArray())); results.Messages.AssertEqual( OnNext<string[]>(Subscribed, new string[] { "A", "B" }), OnNext<string[]>(Subscribed, new string[] { "C", "D", "E" }), OnCompleted<string[]>(Subscribed) ); } } } 

Useful resources when testing Rx:
http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html#TestingRx
http://blogs.msdn.com/b/rxteam/archive/2012/06/14/testing-rx-queries-using-virtual-time-scheduling.aspx
https://msdn.microsoft.com/en-us/library/hh242967%28v=vs.103%29.aspx

0
source

Source: https://habr.com/ru/post/1242444/


All Articles