Python中的统计累加器 [英] Statistical accumulator in Python

查看:172
本文介绍了Python中的统计累加器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

统计累加器允许执行累加计算.例如,为了计算任意次数给定的数字流的算术平均值,可以使一个对象跟踪当前给定的项目数n及其和sum.当请求平均值时,对象仅返回sum/n.

像这样的累加器使您可以递增计算,即在给定新数字时,您无需重新计算整个总和和计数.

可以为其他统计信息编写类似的累加器(请参见增强库(用于C ++实现).

您将如何在Python中实现累加器? 我想出的代码是:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

出现有趣的设计问题:

  1. 如何制作蓄能器 线程安全的?
  2. 如何安全删除 项目?
  3. 如何以某种方式进行架构设计 允许其他统计 轻松插入(用于统计的工厂)

解决方案

对于通用的线程安全高级功能,可以将以下内容与Queue.Queue类和其他一些位结合使用:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

此生成器功能通过将其委托给其他实体来逃避其声称的大多数责任:

  • 它依靠Queue.Queue以线程安全的方式提供其源元素
  • 可以将collections.deque对象作为storage参数的值传递.除其他外,这提供了一种仅使用最后一个n(在这种情况下为3)值的便捷方法
  • 函数本身(在本例中为mean)作为参数传递.在某些情况下,这将导致代码效率低于最佳状态,但很容易应用于各种情况.

请注意,如果您的生产者线程为每个值花费的时间超过0.1秒,则有可能导致累加器超时.通过传递更长的超时或完全删除timeout参数,可以很容易地解决此问题.在后一种情况下,该函数将在队列末尾无限期地阻塞;在子线程(通常是daemon线程)中使用它的情况下,这种用法更有意义.当然,您也可以参数化作为Accumulator的第四个参数传递给q.get的参数.

如果您要传达队列末尾的信息,即生产者线程(此处为putting_thread)不再有其他值,则可以传递并检查哨兵值或使用其他方法. 此线程中有更多信息;我选择编写一个名为 CloseableQueue 的Queue.Queue子类,该子类提供了close方法.

您可以通过多种其他方式自定义此功能的行为,例如,通过限制队列大小;这只是用法的一个例子.

编辑

如上所述,由于重新计算的必要性,这使效率降低了,而且,我认为,它并不能真正回答您的问题.

生成器函数也可以通过其send方法接受值.因此,您可以编写均值生成器函数,例如

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

在这里,yield表达式都将值(send的参数)带入函数中,同时将计算出的值作为send的返回值传递出去.

您可以将对该函数的调用返回的生成器传递给更可优化的累加器生成器函数,如下所示:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass

An statistical accumulator allows one to perform incremental calculations. For instance, for computing the arithmetic mean of a stream of numbers given at arbitrary times one could make an object which keeps track of the current number of items given, n and their sum, sum. When one requests the mean, the object simply returns sum/n.

An accumulator like this allows you to compute incrementally in the sense that, when given a new number, you don't need to recompute the entire sum and count.

Similar accumulators can be written for other statistics (cf. boost library for a C++ implementation).

How would you implement accumulators in Python? The code I came up with is:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

Interesting design questions arise:

  1. How to make the accumulator thread-safe?
  2. How to safely remove items?
  3. How to architect in a way that allows other statistics to be plugged in easily (a factory for statistics)

解决方案

For a generalized, threadsafe higher-level function, you could use something like the following in combination with the Queue.Queue class and some other bits:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

This generator function evades most of its purported responsibility by delegating it to other entities:

  • It relies on Queue.Queue to supply its source elements in a thread-safe manner
  • A collections.deque object can be passed in as the value of the storage parameter; this provides, among other things, a convenient way to only use the last n (in this case 3) values
  • The function itself (in this case mean) is passed as a parameter. This will result in less-than-optimally efficient code in some cases, but is readily applied to all sorts of situations.

Note that there is a possibility of the accumulator timing out if your producer thread takes longer than 0.1 seconds per value. This is easily remedied by passing a longer timeout or by removing the timeout parameter entirely. In the latter case the function will block indefinitely at the end of the queue; this usage makes more sense in a case where it's being used in a sub thread (usually a daemon thread). Of course you can also parametrize the arguments that are passed to q.get as a fourth argument to Accumulator.

If you want to communicate end of queue, i.e. that there are no more values to come, from the producer thread (here putting_thread), you can pass and check for a sentinel value or use some other method. There is more info in this thread; I opted to write a subclass of Queue.Queue called CloseableQueue that provides a close method.

There are various other ways you could customize the behaviour of such a function, for example by limiting the queue size; this is just an example of usage.

edit

As mentioned above, this loses some efficiency because of the necessity of recalculation and also, I think, doesn't really answer your question.

A generator function can also accept values through its send method. So you can write a mean generator function like

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

Here the yield expression is both bringing values —the arguments to send— into the function, while simultaneously passing the calculated values out as the return value of send.

You can pass the generator returned by a call to that function to a more optimizable accumulator generator function like this one:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass

这篇关于Python中的统计累加器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆