I am looking for a way to dynamically merge data sources without interruption. A real-world scenario will pull as data from several sources without taking into account redundant information.
To simplify the code, I replaced the more complex code with a simple number generator that will continuously generate data. This can be compared to the constant flow of data from several external servers.
I want to be able to combine the two sources and print the results (if necessary) on the console, this part works fine. When we complete these two sources and merge in another source, the situation ceases to work as expected. In this case, we could just as easily reconnect with the integrated StreamObserver, however in a much larger application we would have to worry about data gaps and also keep track of which observer subscribers are subscribing.
Is there any way around this?
// imports using System; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; static void Main(string[] args) { // base "stream of results" as we will want to randomly add (and terminate other sources) IObservable<int> merged = Observable.Empty<int>(); // source 1 var tokenSource1 = new CancellationTokenSource(); IObservable<int> xs = Generate(tokenSource1, "A"); // to avoid generating the same numbers, which does happen, // sleep some amount of time before calling generate again Thread.Sleep(100); // source 2 var tokenSource2 = new CancellationTokenSource(); IObservable<int> xt = Generate(tokenSource2, "B"); // odd queries var seq1 = from n in xs where n % 2 == 1 select n; // even queries var seq2 = from n in xt where n % 2 == 0 select n; // merge everything together merged = merged.Merge<int>(seq1); merged = merged.Merge<int>(seq2); // observer for the merged "streams" // NOTE: while this does not appear to be working correctly, // remember you have 2 streams and 2 queries at work. It // really is doing what it expected to here. IDisposable mergedStreamObserver = merged.Subscribe(str => { Console.WriteLine(str); }); // kill both sources Console.ReadKey(); tokenSource1.Cancel(); tokenSource2.Cancel(); // start source and query for evens // try to merge it Console.ReadKey(); tokenSource2 = new CancellationTokenSource(); xt = Generate(tokenSource2, "B"); seq2 = from n in xt where n % 2 == 0 select n; merged = merged.Merge(seq2); // Nothing is happening because the merged stream was modified. // How do we create a composite Observable from multiple sources // and dynamically add/terminate those sources? Console.ReadKey(); tokenSource2.Cancel(); mergedStreamObserver.Dispose(); Console.ReadKey(); } static IObservable<int> Generate(CancellationTokenSource tokenSource, string name) { Random random = new Random(); Action<int> observer = _ => { }; /* We could use null, but then at every invocation * we'd have to copy to a local and check for null. */ Task.Factory.StartNew(() => { while(!tokenSource.IsCancellationRequested) { var t = random.Next(0, 100); Console.WriteLine("From Generator {0}: {1}", name, t); observer(t); Thread.Sleep(1000); } Console.WriteLine("Random Generator Stopped"); }, tokenSource.Token); return Observable.FromEvent<int>( eh => observer += eh, eh => observer -= eh); }
source share