It does this as an RX extension operator.
public static class MergeMixins { public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other) { return Observable.Create<int>((observer) => { Queue<int> BufferA = new Queue<int>(); Queue<int> BufferB = new Queue<int>(); Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{ pushBuffer.Enqueue(value); while (BufferA.Count() != 0 && BufferB.Count() != 0) { if (BufferA.Peek() < BufferB.Peek()) observer.OnNext(BufferA.Dequeue()); else observer.OnNext(BufferB.Dequeue()); } }; return new CompositeDisposable( This.Subscribe(v => update(BufferA, v)), other.Subscribe(v => update(BufferB, v))); }); } }
My test output uses your test
Result StandardOutput: tick tick tick 0 tick 1 tick 2 4 5 tick 6 tick
source share