Running Amount with Rx

There must be someone who has decided this already. Imagine that I have a class that periodically triggers an event about a change in value (for example, PropertyChanged) This value is nothing but the sum of money.

Now I would like to use Rx to get the sum of the increase in these last 10 minutes. for example, BufferWithTime does not help, since I always need the last 10 minutes.

Any ideas how I can do this?

Tia Martin

+3
source share
2 answers

Observable.Scan. int () DateTime.

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h);
var runningSums =
    events.Scan(new List<Tuple<int, DateTime>>(),
                (l, e) =>
                {
                    var now = DateTime.Now;
                    // Add last event data to list.
                    l.Add(Tuple.Create(e.EventArgs.Money, now));
                    // Return the correct part of the list (everything
                    // from the last ten minutes).
                    return l.Where(t => (now - t.Item2) <
                                   TimeSpan.FromMinutes(10)).ToList();
                 })
          .Select(l => l.Sum(t => t.Item1));
runningSums.Subscribe(sum => Console.WriteLine(sum));

EDIT: , :

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h);
var runningSums =
    events.Scan(Tuple.Create(new List<Tuple<int, DateTime>>(),
                             DateTime.Now - TimeSpan.FromMinutes(10)),
                (l, e) =>
                {
                    var now = DateTime.Now;
                    l.Item1.Add(Tuple.Create(e.EventArgs.Nr, now));
                    // if (trimming-condition) then trim front of list...
                    return Tuple.Create(l.Item1, now - TimeSpan.FromMinutes(10));
                })
          .Select(l => l.Item1.Where(t => t.Item2 > l.Item2).Sum(t => t.Item1));
runningSums.Subscribe(sum => Console.WriteLine(sum));
+3

, . , ( , ). Timestamped ...

    public static class RxEntentsions
        {
            class TimeLimitedList<T>
            {
                public List<Timestamped<T>> Values = new List<Timestamped<T>>();
                TimeSpan span;
                public TimeLimitedList(TimeSpan sp) { span = sp; }
                public void Add(Timestamped<T> v)
                {
                    Values.Add(v);
                    Values.RemoveAll(a => a.Timestamp < (DateTime.Now - span));
                }
            }

            public static IObservable<List<Timestamped<TSource>>> SlidingWindow<TSource>(this IObservable<Timestamped<TSource>> source, TimeSpan slidingWindow)
            {
                return source.Scan0(new TimeLimitedList<TSource>(slidingWindow), (acc, v) => { acc.Add(v); return acc; }).Select(a => a.Values);
            }
        }


    static void Main(string[] args)
    {
        var gen = Observable.Interval(TimeSpan.FromSeconds(0.25d)).Timestamp();
        gen.SlidingWindow(TimeSpan.FromSeconds(1)).Subscribe(slw => {slw.ForEach(e=> Console.WriteLine(e)); Console.WriteLine("--------");});
        Console.ReadLine();
    }
+1

Source: https://habr.com/ru/post/1765079/


All Articles