Rx, how to group a complex object by key, and then do SelectMany without "stopping" the stream?

This is related to my other question here . James World presented the solution as follows:

// idStream is an IObservable<int> of the input stream of IDs // alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

<edit 2:

Question: How to start timers immediately, without waiting for the arrival of the first events? I think the problem is with the root in my question. For this, I planned to send dummy objects with identifiers, which, as I know, should be there. But, as I write in the following, I ran into some other problems. However, I think the solution will be interesting too.

Forward with other interesting pieces! Now, if I would like to group a complex object, such as the next one and a group by key, as follows (will not compile)

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

then i get in trouble. I cannot change the SelectMany , Concat and Observable.Return to make the query work as before. For example, if I make a request like

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key.First()))) .Subscribe(i => Console.WriteLine(i.Id + "-" + i.IsTest); 

Then two events are needed before you can see the output in Subscribe . This is the result of calling First , I'm going. In addition, I also like to use attributes of complex objects in the alarmInterval call.

Can someone give an explanation of what is happening, maybe even a solution? The problem with the transition with an unmodified solution is that the grouping does not look only for identifiers for the key value, as well as the IsTest field.

<edit: As a note, the problem can probably be solved solely by creating an explicit class or structure, and then implement a custom IEquatable , and then using James as- code so that grouping will only occur by identifiers. It looks like a hack, though.

+3
source share
2 answers

In addition, if you want to count the number of times you saw an item before the alarm went off, you can do it this way by using the counter overload in Select .

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)) .SelectMany(grp => grp.Select((count, alarm) => new { count, alarm }).TakeLast(1)); 

Note that this will be 0 for the first (seed) element - this is probably what you want anyway.

+1
source

In your selection, you create an anonymous type. Lets call it A1. Suppose your idStream is IObservable. Since this is the key in GroupByUntil , you do not need to worry about comparing keywords. Whole equality is equivalent.

GroupByUntil - IObservable<IGroupedObservable<int, A1>> .

The selected SelectMany tries to be IObservable<A1> . You just need Concat(Observable.Return(grp.Key)) here, but the key type and group element type must match or SelectMany will not work. Therefore, the key must also be A1. Anonymous types use structural equality, and the return type is stream A1, but you cannot declare this as an open return type.

If you just need an identifier, you should add .Select(x => x.Id) after Throttle :

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key) .Select(x => x.Id)) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

If you want to use A1 instead, you need to create a specific type that implements Equality.

EDIT

I have not tested it, but you could smooth it out more easily, I think it is easier! It outputs A1, though, so you have to deal with this if you need to return the stream somewhere.

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)) .SelectMany(grp => grp.TakeLast(1)); 
+1
source

Source: https://habr.com/ru/post/956188/


All Articles