Combining two ordered IObservables

I have two ordered IObservable<double> and would like to combine them into one ordered IObservable<double> . An example is given below:

 A 2 3 4 - - 5 - B - - - 1 5 - 6 Out - - - 1 2 3 4 5 - 

The idea is that Out will only return values ​​if it is sure in the final order. I am sure that this should be easy to do, but I cannot come up with a good solution (in this context, good tools consisting of rx operators as much as possible);

EDIT: I want the following program to generate the following output

 static void Main(string[] args) { var a = new Subject<int>(); var b = new Subject<int>(); a.MergeSort(b).Subscribe(Console.WriteLine); a.OnNext(2); Console.WriteLine("tick"); a.OnNext(4); Console.WriteLine("tick"); a.OnNext(6); Console.WriteLine("tick"); b.OnNext(0); Console.WriteLine("tick"); b.OnNext(1); Console.WriteLine("tick"); b.OnNext(5); Console.WriteLine("tick"); b.OnNext(7); Console.WriteLine("tick"); } Output: tick tick tick 0 tick 1 tick 2 4 5 tick 6 tick 
+4
source share
1 answer

It does this as an RX extension operator.

 public static class MergeMixins { public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other) { return Observable.Create<int>((observer) => { Queue<int> BufferA = new Queue<int>(); Queue<int> BufferB = new Queue<int>(); Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{ pushBuffer.Enqueue(value); while (BufferA.Count() != 0 && BufferB.Count() != 0) { if (BufferA.Peek() < BufferB.Peek()) observer.OnNext(BufferA.Dequeue()); else observer.OnNext(BufferB.Dequeue()); } }; return new CompositeDisposable( This.Subscribe(v => update(BufferA, v)), other.Subscribe(v => update(BufferB, v))); }); } } 

My test output uses your test

 Result StandardOutput: tick tick tick 0 tick 1 tick 2 4 5 tick 6 tick 
+1
source

Source: https://habr.com/ru/post/1447377/


All Articles