Protest! I do not use F # regularly enough to be 100% more comfortable with the syntax, but I think I see what happens.
However, both of these cases look strange to me, and it depends a lot on how someOtherObs is implemented, and where (in terms of threads) everything works.
Case Analysis 1
You apply concat to the source stream, which works as follows:
- He subscribes to someOtherObs, and in response to the first event (a), he pushes play items to the observer.
- He then sends event (a) to the observer.
- Then it ends. At this point, the thread is completed and no further events are dispatched.
- In case someOtherObs is empty or just has one error, this will propagate instead of the observer.
Now that this thread completes, someOtherObs is concatenated with it. What happens now is a little unpredictable - if someOtherObs is cold, then the first event will be sent a second time, if someOtherObs will be hot, then the first event will not be indignant, but there is a potential race condition around which the remainder event will be further, depending on as implemented by someOtherObs. You can easily skip events if they are hot.
Case Analysis 2
You replay all replay events and then dispatch all someOtherObs events - but again there is a race condition if someOtherObs is hot, because you sign up only after clicking replay, and therefore may miss some events.
Comments
In any case, it seems dirty to me.
It is like trying to merge the state of the world (sotw) and the living stream. In this case, you first need to subscribe to the live stream and cache any events while you acquire them and click on sotw events. As soon as sotw is pressed, you click on cached events - be careful to fool events that can be read in sotw - until you catch up with the living, at that moment you can simply transfer the live events.
You can often avoid naive implementations that clear the live cache in the OnNext subscription handler, effectively blocking the source during cleanup, but you run the risk of applying too much backpressure to the source in real time if you have a long history and / or fast moving live stream.
Some considerations, so you can think about it, we hope will set you on the right track.
For reference, here is a very naive and simplified C # implementation that I knocked out that compiles in LINQPad with the rx-main nuget package. The production-ready implementations I made in the past can become quite complex:
void Main() { // asynchronously produce a list from 1 to 10 Func<Task<List<int>>> sotw = () => Task<List<int>>.Run(() => Enumerable.Range(1, 10).ToList()); // a stream of 5 to 15 var live = Observable.Range(5, 10); // outputs 1 to 15 live.MergeSotwWithLive(sotw).Subscribe(Console.WriteLine); } // Define other methods and classes here public static class ObservableExtensions { public static IObservable<TSource> MergeSotwWithLive<TSource>( this IObservable<TSource> live, Func<Task<List<TSource>>> sotwFactory) { return Observable.Create<TSource>(async o => { // Naïve indefinite caching, no error checking anywhere var liveReplay = new ReplaySubject<TSource>(); live.Subscribe(liveReplay); // No error checking, no timeout, no cancellation support var sotw = await sotwFactory(); foreach(var evt in sotw) { o.OnNext(evt); } // note naive disposal // and extremely naive de-duping (it really needs to compare // on some unique id) // we are only supporting disposal once the sotw is sent return liveReplay.Where(evt => !sotw.Any(s => s.Equals(evt))) .Subscribe(o); }); } }