In Rx, how to group events by id and throttle each group with multiple TimeSpans?

I ended up in Rx spree, so to speak, and this question is related to mine here and here . However, perhaps they help someone, as I could see them as useful options for the same topic.

Question:. How can one group a random stream of int (say, on the interval [0, 10], created on a random interval) of objects into groups, and provide the earch group with a variable number of no event alarms (due to the lack of a better definition, for further background see related messages ) More specifically, with the code, how can you determine the throttle settings for each group in the following:

 var idAlarmStream = idStream .Select(i => i) .GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000)) .SelectMany(grp => grp.TakeLast(1)) .Subscribe(i => Console.WriteLine(i)); 

Here, the subscription function will be called if there is more than one second per group. What if you want to determine three different values ​​for the absence of events (say, one second, five seconds and ten seconds) and all are canceled when an event arrives? Can I think:

  • Divide each identifier in idStream into several synthetic ones and provide a bijective mapping between real identifiers and synthetic ones. For example, in this case ID: 1 → 100, 101, 102; ID: 2 → 200, 201, 203, and then define the selector function in Throttle , e.g. Func<int, Timespan>(i => /* switch(i)...*/) , and then when Subscribe is called return the id back. See Also Related Questions for more information.
  • Create a nested group in which identifiers are grouped, and then the identifier groups will be copied / replicated / branched (I don't know the right term) into groups according to the throttling values. This approach, I think, is rather complicated, and I'm not sure if this will be the best. However, I would be interested to see such a request.

In a more general setup, I suspect this is a situation where there are several handlers per group, although I could not find anything related to this.

<editing: As a (hopefully clarifying) example, idStream one ID: 1, on which three different counters will be triggered, each of which waits for the next event or alarm if a new ID 1 is not detected on time. Counter 1 (C1) waits for five seconds, counter 2 (C2) for seven seconds and counter 3 (C3) for ten seconds. If a new ID 1 is received within the interval [0, 5] seconds, all counters will be reinitialized with the above values ​​and no alarm will be sent. If a new identifier is received within the interval [0, 7) seconds, signals C1 and C2 and C3 will be reinitialized. Similarly, if a new identifier is received within a period of [0, 10) seconds C1 and C2, but C3 will simply be reinitialized.

That is, there would be several “no alarms” or actions taken in general with respect to one ID under certain conditions. I’m not sure what would be a good analogue ... Perhaps laying the “signal lights” in the tower so that it was green first, then yellow, and finally red. Since the absence of the identifier lasts longer and longer, the color after the color lights up (in this case, red is the last). Then, when one identifier is detected, all lights will be turned off.

<edit 2: After finalizing the James code in the following example and leaving the rest as written, I found that Subscribe would be called right in the first event at both alarm levels.

 const int MaxLevels = 2; var idAlarmStream = idStream .Select(i => i) .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default) .Subscribe(i => { Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)); }); 

Let's see what happens here, and if MaxLevels can be provided dynamically ...

<edit 3: James code works. The problem was between the chair and the keyboard! Changing the time to something more reasonable certainly helped. In fact, I changed them to large pieces, but that was .FromTicks , and it slipped away for a few minutes.

+6
source share
1 answer

This works, I think - I will try to add a more complete explanation later. Each signaling level has a specific threshold (for each group of signals). They are expected to increase.

The basic idea is for signals from all previous levels to arrive at the current level. The first level is the “zero” level of the signals themselves, which is filtered out before the alarm returns. Note that TSignal keys must support a value identifier.

I am sure there is room for simplification!

Unit test example:

 public class AlarmTests : ReactiveTest { [Test] public void MultipleKeyMultipleSignalMultipleLevelTest() { var threshold1 = TimeSpan.FromTicks(300); var threshold2 = TimeSpan.FromTicks(800); var scheduler = new TestScheduler(); var signals = scheduler.CreateHotObservable( OnNext(200, 1), OnNext(200, 2), OnNext(400, 1), OnNext(420, 2), OnNext(800, 1), OnNext(1000, 1), OnNext(1200, 1)); Func<int, int> keySelector = i => i; Func<int, int, TimeSpan> thresholdSelector = (key, level) => { if (level == 1) return threshold1; if (level == 2) return threshold2; return TimeSpan.MaxValue; }; var results = scheduler.CreateObserver<Alarm<int>>(); signals.AlarmSystem( keySelector, thresholdSelector, 2, scheduler).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(700, new Alarm<int>(1, 1)), OnNext(720, new Alarm<int>(2, 1)), OnNext(1220, new Alarm<int>(2, 2)), OnNext(1500, new Alarm<int>(1, 1)), OnNext(2000, new Alarm<int>(1, 2))); } [Test] public void CheckAlarmIsSuppressed() { var threshold1 = TimeSpan.FromTicks(300); var threshold2 = TimeSpan.FromTicks(500); var scheduler = new TestScheduler(); var signals = scheduler.CreateHotObservable( OnNext(200, 1), OnNext(400, 1), OnNext(600, 1)); Func<int, int> keySelector = i => i; Func<int, int, TimeSpan> thresholdSelector = (signal, level) => { if (level == 1) return threshold1; if (level == 2) return threshold2; return TimeSpan.MaxValue; }; var results = scheduler.CreateObserver<Alarm<int>>(); signals.AlarmSystem( keySelector, thresholdSelector, 2, scheduler).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(900, new Alarm<int>(1, 1)), OnNext(1100, new Alarm<int>(1, 2))); } } public static class ObservableExtensions { /// <summary> /// Create an alarm system that detects signal gaps of length /// determined by a signal key and signals alarms of increasing severity. /// </summary> /// <typeparam name="TSignal">Type of the signal</typeparam> /// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam> /// <param name="signals">Input signal stream</param> /// <param name="keySelector">Function to select a key from a signal for grouping</param> /// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level. /// Should return TimeSpan.MaxValue for levels above the highest level</param> /// <param name="levels">Number of alarm levels</param> /// <param name="scheduler">Scheduler use for throttling</param> /// <returns>A stream of alarms each of which contains the signal and alarm level</returns> public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>( this IObservable<TSignal> signals, Func<TSignal, TKey> keySelector, Func<TKey, int, TimeSpan> thresholdSelector, int levels, IScheduler scheduler) { var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0)) .Publish() .RefCount(); for (int i = 0; i < levels; i++) { alarmSignals = alarmSignals.CreateAlarmSystemLevel( keySelector, thresholdSelector, i + 1, scheduler); } return alarmSignals.Where(alarm => alarm.Level != 0); } private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>( this IObservable<Alarm<TSignal>> alarmSignals, Func<TSignal, TKey> keySelector, Func<TKey, int, TimeSpan> thresholdSelector, int level, IScheduler scheduler) { return alarmSignals .Where(alarmSignal => alarmSignal.Level == 0) .Select(alarmSignal => alarmSignal.Signal) .GroupByUntil( keySelector, grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler)) .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level))) .Merge(alarmSignals); } } public class Alarm<TSignal> : IEquatable<Alarm<TSignal>> { public Alarm(TSignal signal, int level) { Signal = signal; Level = level; } public TSignal Signal { get; private set; } public int Level { get; private set; } private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y) { if (ReferenceEquals(x, null)) return false; if (ReferenceEquals(y, null)) return false; if (ReferenceEquals(x, y)) return true; return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level); } // Equality implementation added to help with testing. public override bool Equals(object other) { return Equals(this, other as Alarm<TSignal>); } public override string ToString() { return string.Format("Signal: {0} Level: {1}", Signal, Level); } public bool Equals(Alarm<TSignal> other) { return Equals(this, other); } public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y) { return Equals(x, y); } public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y) { return !Equals(x, y); } public override int GetHashCode() { return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329); } } 
+5
source

Source: https://habr.com/ru/post/956186/


All Articles