How to combine n observables dynamically into a list?

I have a collection of observables that generate state changes for the so-called Channel . And I have a ChannelSet that needs to control these channels.

I would like to write something like this: if one channel works, the channel set is completed, otherwise the channel set does not work.

 IEnumerable<ChannelState> channelStates = ...; if (channelStates.Any(cs => cs == ChannelState.Operational)) channelSet.ChannelSetState = ChannelSetState.Up; else channelSet.ChannelSetState = ChannelSetState.Down; 

But where can I get an IEnumerable<ChannelState> ? If I have 1 channel, I can simply subscribe to its state changes and change the channel state accordingly. For two channels, I could use CombineLatest :

 Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) => { if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up) return ChannelSetState.Up; else return ChannelSetState.Down; }); 

But I have an IEnumerable<Channel> and the corresponding IEnumerable<IObservable<ChannelState>> . I am looking for something like CombineLatest that is not limited to a fixed number of observables.

To complicate matters, you can add and remove a collection of channels. Therefore, from time to time a channel will be added, for example. The new channel also generates state changes that must be enabled.

What I'm really looking for is a function:

 IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState> 

which is updated when the input changes. There must be some way to accomplish this with Rx, but I cannot figure out how to do this.

+6
source share
3 answers

There is a pretty simple way to do what you want with Rx, but you only need to think in terms of observables and not mix in enumerated numbers.

The signature of the function you really need to think about in terms of:

 IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState> 

Here's the function:

 Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); 

It is important that each IObservable<ChannelState> in the IObservable<IObservable<ChannelState>> behaves correctly for this to work.

I assumed that the ChannelState enumeration is in Idle state and that each IObservable<ChannelState> will produce zero or more pairs of Operational / Idle ( Operational followed by Idle ) values ​​before terminating.

You also said: β€œa collection of channels can be added and removed from” - thinking in terms of IEnumerable<IObservable<ChannelState>> this seems reasonable - but in Rx you do not need to worry about deleting, because each observable can signal completion . As soon as it signals completion, it is as if removed from the collection, because it cannot give any additional values. Therefore, you only need to worry about adding to the collection - it's easy to use themes.

So, now this function can be used like this:

 var channelStatesSubject = new Subject<IObservable<ChannelState>>(); var channelStates = channelStatesSubject.AsObservable(); var channelSetStates = f(channelStates); channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ }); channelStatesSubject.OnNext(/* IObservable<ChannelState> */); channelStatesSubject.OnNext(/* IObservable<ChannelState> */); channelStatesSubject.OnNext(/* IObservable<ChannelState> */); // etc 

I ran this with some test code that used three random ChannelState observables, with a Do call in the f function for debugging, and got the following sequence:

 1 Up 2 3 2 1 2 1 0 Down 1 Up 0 Down 

I think you are behind you. Let me know if I missed something.


According to the comments below, the ChannelState enumeration has several states, but only Operational means that the connection is complete. Therefore, it is very easy to add a DistinctUntilChanged statement to hide several down states. Here is the code now:

 Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .DistinctUntilChanged() .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); 

Added code so that the first select request always starts with 1 . Here is the code now:

 Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f = channelStates => channelStates .Merge() .Select(cs => cs == ChannelState.Operational ? 1 : -1) .StartWith(1) .DistinctUntilChanged() .Scan(0, (cssn, csn) => cssn + csn) .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); 
+3
source

Perhaps start with IObservable<Channel> rather than running w / IEnumerable<Channel> . The way to do this is to use a Subject<Channel> , and when a new one is created, OnNext() it.

If you need a list,

xsChannels.Subscribe (item => {castle (list) {list.add (item);}});

+1
source

I promised to add a solution that I came up with myself, so here it is. Until I find anything better, I will use this, although I still think there should be a better way :)

I use a class that uses ConcurrentDictionary to save the last value from each registered observable. When an observable is unregistered, its last value is again deleted, as well as its associated subscription.

When any registered observable generates a value, all the last values ​​are collected and sent to Subject .

 public class DynamicCombineLatest<T> { private readonly IDictionary<IObservable<T>, T> _latestValues = new ConcurrentDictionary<IObservable<T>, T>(); private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions = new ConcurrentDictionary<IObservable<T>, IDisposable>(); private readonly ISubject<IEnumerable<T>> _result = new Subject<IEnumerable<T>>(); public void AddObservable(IObservable<T> observable) { var subscription = observable.Subscribe(t => { _latestValues[observable] = t; _result.OnNext(_latestValues.Values); }); _subscriptions[observable] = subscription; } public void RemoveObservable(IObservable<T> observable) { _subscriptions[observable].Dispose(); _latestValues.Remove(observable); _subscriptions.Remove(observable); } public IObservable<IEnumerable<T>> Result { get { return _result; } } } 
+1
source

Source: https://habr.com/ru/post/893475/


All Articles