Statistical Drive in Python

Statistical storage allows incremental calculations. For example, to calculate the arithmetic mean flow numbers specified in an arbitrary timing, it was possible to create an object that keeps track of the current number of predetermined data, nand their sum sum. When the average value is requested, the object simply returns sum/n.

A battery like this allows you to calculate in stages in the sense that when you assign a new number, you do not need to recount the entire amount and account.

Similar batteries can be recorded for other statistics (see upgrade library for C ++ implementation).

How would you implement batteries in Python? The code I came up with :

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

Interesting design issues arise:

  • How to make the battery thread safe?
  • How to safely remove items?
  • As an architect on the way, allowing you to use other statistics (a factory for statistics)
+3
source share
2 answers

For a higher-level generic, thread-safe function, you can use something like the following in combination with a class Queue.Queueand some other bits:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

This generator function evades most of its assumed responsibility, delegating it to other objects:

  • Queue.Queue .
  • collections.deque storage; , , n ( 3)
  • ( mean) . , .

, , 0,1 . , - . ; , ( daemon). , , q.get Accumulator.

, .. , ( putting_thread), - . ; Queue.Queue, CloseableQueue, close.

, , ; .

, - , , , .

send. , ,

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

yield - send - , send.

, , , :

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass
+2

Python, , :

  • .
  • @property - .

, , API , , - :

def add(self, num) # add a number
def compute(self) # compute the value of the accumulator

AccumulatorRegistry, , . :

class Accumulators(object):
    _accumulator_library = {}

    def __init__(self):
        self.accumulator_library = {}
        for key, value in Accumulators._accumulator_library.items():
            self.accumulator_library[key] = value()

    @staticmethod
    def register(name, accumulator):
        Accumulators._accumulator_library[name] = accumulator

    def add(self, num):
        for accumulator in self.accumulator_library.values():
            accumulator.add(num)

    def compute(self, name):
        self.accumulator_library[name].compute()

    @staticmethod
    def register_decorator(name):
        def _inner(cls):
            Accumulators.register(name, cls)
            return cls


@Accumulators.register_decorator("Mean")
class Mean(object):
    def __init__(self):
        self.total = 0
        self.count = 0

    def add(self, num):
        self.count += 1
        self.total += num

    def compute(self):
        return self.total / float(self.count)

, , , . Python GIL . , , :

  • , threading.local
  • , , , .
+1

Source: https://habr.com/ru/post/1766106/


All Articles