There is a pretty simple way to do what you want with Rx, but you only need to think in terms of observables and not mix in enumerated numbers.
The signature of the function you really need to think about in terms of:
IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
Here's the function:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged();
It is important that each IObservable<ChannelState> in the IObservable<IObservable<ChannelState>> behaves correctly for this to work.
I assumed that the ChannelState enumeration is in Idle state and that each IObservable<ChannelState> will produce zero or more pairs of Operational / Idle ( Operational followed by Idle ) values ββbefore terminating.
You also said: βa collection of channels can be added and removed fromβ - thinking in terms of IEnumerable<IObservable<ChannelState>> this seems reasonable - but in Rx you do not need to worry about deleting, because each observable can signal completion . As soon as it signals completion, it is as if removed from the collection, because it cannot give any additional values. Therefore, you only need to worry about adding to the collection - it's easy to use themes.
So, now this function can be used like this:
var channelStatesSubject = new Subject<IObservable<ChannelState>>(); var channelStates = channelStatesSubject.AsObservable(); var channelSetStates = f(channelStates); channelSetStates.Subscribe(css => { }); channelStatesSubject.OnNext(); channelStatesSubject.OnNext(); channelStatesSubject.OnNext();
I ran this with some test code that used three random ChannelState observables, with a Do call in the f function for debugging, and got the following sequence:
1 Up 2 3 2 1 2 1 0 Down 1 Up 0 Down
I think you are behind you. Let me know if I missed something.
According to the comments below, the ChannelState enumeration has several states, but only Operational means that the connection is complete. Therefore, it is very easy to add a DistinctUntilChanged statement to hide several down states. Here is the code now:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .DistinctUntilChanged() .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged();
Added code so that the first select request always starts with 1 . Here is the code now:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .StartWith(1) .DistinctUntilChanged() .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged();