Over the past few days, I have been trying to compose an Rx request to handle the flow of events from the source and check for the absence of some identifiers. Absence is defined so that there is a series of time windows (for example, on all days from 9:00 to 17:00), during which there should be a maximum of, say, twenty minutes without an identifier encountered in the stream. To further complicate matters, it is necessary to determine the time of absence by identifier. For example, assuming that in a combined stream of events (A, A, B, C, A, C, B, etc.) there are three types of events A, B, and C, we can determine that
- Events are monitored from 9:00 to 10:00 every day, the maximum absence of events is 10 minutes.
- Events B are tracked from 9:00 to 11:00 every day, the maximum absence of events is 5 minutes.
- Events C are tracked from 12:00 to 15:00 every day, the maximum absence of events is 30 minutes.
It seems to me that I need to first split the stream into separate events on GroupBy , and then process the resulting separate streams using the absence rules. I already thought it over a bit on the Microsoft Rx forums (thanks a lot to Dave), and I have working code to create both rules and absence checks, but I'm struggling, for example, how to combine this with a grouping.
So, without further intervention, the code hacked so far:
And a request to check for absence without grouping, which is one of my sticking points. <edit: Perhaps I need to group time points in pairs and add an identifier and use the resulting triplet in the request ... </edit>
dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100)) select new FakeData { Id = new Random().Next(1, 5), SomeData = DateTimeOffset.Now.ToString() }; var startPointOfTimeChanges = alarmStartPeriods.ToObservable(); var endPointOfTimeChanges = alarmEndPeriods.ToObservable(); var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end }); var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250); timer = (from duration in durations select (from _ in Observable.Timer(DateTime.Now) from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end) select x)).Switch(); timer.Subscribe(x => Debug.WriteLine(x.SomeData));
Questions:
- How do I try GroupBy to get incoming data by ID and still detect the absence of events?
- One thing I noticed is that if the start point of the alarm period is in the past (for example, the request was started at 10:00, when the rule says that the start starts at 9:00), the request does not start. I believe that the start time should be rescheduled to the present. Are there some standard ways to do this, or should I just enter a conditional expression?
Other questions I could think of would be nice (have fun :)):
- How to save the response to the last event that happened on ID?
- How to dynamically change variables (since Dave has already been mentioned on MS forums)?
- Then, in the end, batch events and storage somewhere (like a database), like in this wonderful PeteGoo blog example?
Other options I can think of are to explicitly use
System.Threading.Timers
and
ConcurrentDictionary
but you need to keep learning!
Regarding James's answer, answer briefly how this works and how I intended to use it.
First, the observable will do nothing before the first event occurs. So, if monitoring should start right away, you need to add another Rx functionality or fire a dummy event. I do not think this is a problem.
Secondly, a new timeout variable will be obtained from alarmInterval for any new identifier. Here is a new creature, even one that has been absent for too long and has caused alarm.
I think this works well in that you can subscribe to this observable and do something with side effects. Some examples will look like setting a flag, sending a signal, and what business rules it has. In addition, while maintaining the correct blocking, etc., It should be easy to provide new time intervals in accordance with predefined alarm rules with a divided absence period and time window.
I will have to work on other concepts related to this in order to better understand things. But my main problems were satisfied with this. Life is good and good .:-)