Like an IObservable <double>. Is the job supposed to work?

Update

It seems that John Skeet was right (a big surprise!), And the problem was my assumption of an extension Averageproviding a continuous average (this is not the case).

For my behavior, I wrote a simple extension method ContinuousAverage, the implementation of which I include here in the interests of others who might want something like this:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

I'm going to go ahead and do something like above for other obvious candidates, so, ContinuousCount, ContinuousSum, ContinuousMin, ContinuousMax... maybe ContinuousVariance, and ContinuousStandardDeviation, eh? Any thoughts on this?


Original question

I use Rx Extensions a bit here and there, and I feel that I have some basic ideas.

- : , :

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

IObservable<double> (bids avgOfBids); MarketDataProvider, .

- :

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

, avgOfBids . ? , , , , Average . ( IObservable<T> - , Max, Count ..)

+3
3

/// .. - , . , :

var avgOfBids = bids.Take(5).Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

, 5 , . "" .

+2

ContinousAverage - .Scan():

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum / agg.count)
+3

; ( IDisposable.)

.

var ticksToAverage = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

 var bidsToAverage = ticksToAverage
      .Where(e => e.EventArgs.Quote.HasBid)
         .Select(e => e.EventArgs.Quote.Bid);


var avgOfBids = bidsToAverage.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe( b => Console.WriteLine("Avg Bid: {0}", b)
                ); 
0

Source: https://habr.com/ru/post/1744558/


All Articles