How to split (GroupBy) a stream and control the absence of elements in Rx for some periods of time?

Over the past few days, I have been trying to compose an Rx request to handle the flow of events from the source and check for the absence of some identifiers. Absence is defined so that there is a series of time windows (for example, on all days from 9:00 to 17:00), during which there should be a maximum of, say, twenty minutes without an identifier encountered in the stream. To further complicate matters, it is necessary to determine the time of absence by identifier. For example, assuming that in a combined stream of events (A, A, B, C, A, C, B, etc.) there are three types of events A, B, and C, we can determine that

  • Events are monitored from 9:00 to 10:00 every day, the maximum absence of events is 10 minutes.
  • Events B are tracked from 9:00 to 11:00 every day, the maximum absence of events is 5 minutes.
  • Events C are tracked from 12:00 to 15:00 every day, the maximum absence of events is 30 minutes.

It seems to me that I need to first split the stream into separate events on GroupBy , and then process the resulting separate streams using the absence rules. I already thought it over a bit on the Microsoft Rx forums (thanks a lot to Dave), and I have working code to create both rules and absence checks, but I'm struggling, for example, how to combine this with a grouping.

So, without further intervention, the code hacked so far:

//Some sample data bits representing the events. public class FakeData { public int Id { get; set; } public string SomeData { get; set; } } //Note the Now part in DateTime to zero the clock time and have only the date. The purpose is to create start-end pairs of times, eg 9:00-17:00. //The alarm start and end time points should match themselves pairwise, could be pairs of values... var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14); var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0); var alarmStartPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList(); var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList(); 

And a request to check for absence without grouping, which is one of my sticking points. <edit: Perhaps I need to group time points in pairs and add an identifier and use the resulting triplet in the request ... </edit>

 dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100)) select new FakeData { Id = new Random().Next(1, 5), SomeData = DateTimeOffset.Now.ToString() }; var startPointOfTimeChanges = alarmStartPeriods.ToObservable(); var endPointOfTimeChanges = alarmEndPeriods.ToObservable(); var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end }); var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250); timer = (from duration in durations select (from _ in Observable.Timer(DateTime.Now) from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end) select x)).Switch(); timer.Subscribe(x => Debug.WriteLine(x.SomeData)); 

Questions:

  • How do I try GroupBy to get incoming data by ID and still detect the absence of events?
  • One thing I noticed is that if the start point of the alarm period is in the past (for example, the request was started at 10:00, when the rule says that the start starts at 9:00), the request does not start. I believe that the start time should be rescheduled to the present. Are there some standard ways to do this, or should I just enter a conditional expression?

Other questions I could think of would be nice (have fun :)):

  • How to save the response to the last event that happened on ID?
  • How to dynamically change variables (since Dave has already been mentioned on MS forums)?
  • Then, in the end, batch events and storage somewhere (like a database), like in this wonderful PeteGoo blog example?

Other options I can think of are to explicitly use

 System.Threading.Timers 
and
 ConcurrentDictionary 
but you need to keep learning!

Regarding James's answer, answer briefly how this works and how I intended to use it.

First, the observable will do nothing before the first event occurs. So, if monitoring should start right away, you need to add another Rx functionality or fire a dummy event. I do not think this is a problem.

Secondly, a new timeout variable will be obtained from alarmInterval for any new identifier. Here is a new creature, even one that has been absent for too long and has caused alarm.

I think this works well in that you can subscribe to this observable and do something with side effects. Some examples will look like setting a flag, sending a signal, and what business rules it has. In addition, while maintaining the correct blocking, etc., It should be easy to provide new time intervals in accordance with predefined alarm rules with a divided absence period and time window.

I will have to work on other concepts related to this in order to better understand things. But my main problems were satisfied with this. Life is good and good .:-)

+4
source share
1 answer

EDITED - Improved code by making SelectMany to use TakeLast .

I wrote a blog post about the discovery of disconnected clients - this will work just as well for your scenario here if you replace the timeToHold variable with a post with a function like alarmInterval below to get a Timespan based on the client ID.

eg:.

 // idStream is an IObservable<int> of the input stream of IDs // alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.TakeLast(1)); 

This gives you the basic functions of continuous monitoring, despite the active periods of monitoring.

To get the functionality of a monitor window, I would rotate everything around and filter out the above output with WHERE, which checks if an identifier falls into it, tracking the time window. This makes it easier to work with changing monitoring periods.

You can do something more interesting by turning each monitoring window into a stream and combining those who have an alert stream, but I'm not sure about the benefits of the added complexity.

The alarmInterval function will also give you an element of dynamic alarm intervals, in which it can return new values, but they will only work after the alarm goes off for this identifier, thereby ending its current group.

--- Let's move on to some theorizations here ---

In order to get this fully dynamic, you will have to finish the group somehow - you can do this in several ways.

One could project an idStream using Select into a stream of a custom type that contains an identifier plus a global counter value. Give this type an appropriate equality implementation so that it works correctly with GroupByUntil.

Now every time you change the alarm intervals, change the counter. This will create new groups for each identifier. You can then add an additional check to the last filter, which ensures that the output events have the most recent counter value.

+3
source

Source: https://habr.com/ru/post/956189/


All Articles