有效循环缓冲区?

我希望在 python 中创建一个高效的 循环缓冲器循环缓冲器(目标是获取缓冲区中整数值的平均值)。

这是使用列表收集值的有效方法吗?

def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )

什么会更有效率(为什么) ?

120896 次浏览

从列表的头部弹出导致整个列表被复制,因此效率低下

相反,您应该使用一个固定大小的列表/数组和一个在添加/删除项时在缓冲区中移动的索引

我会使用 collections.deque与一个 maxlen参数

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

deque的文档中有一个与您想要的类似的 食谱。我认为它是最有效的,这完全基于这样一个事实: 它是由一个非常熟练的工作人员用 C 语言实现的,他们习惯于编写一流的代码。

可以使用 deque class,但是对于问题(平均值)的要求,这是我的解决方案:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

你也可以看到这个相当老的 巨蟒配方

下面是我自己的 NumPy 数组版本:

#!/usr/bin/env python


import numpy as np


class RingBuffer(object):
def __init__(self, size_max, default_value=0.0, dtype=float):
"""initialization"""
self.size_max = size_max


self._data = np.empty(size_max, dtype=dtype)
self._data.fill(default_value)


self.size = 0


def append(self, value):
"""append an element"""
self._data = np.roll(self._data, 1)
self._data[0] = value


self.size += 1


if self.size == self.size_max:
self.__class__  = RingBufferFull


def get_all(self):
"""return a list of elements from the oldest to the newest"""
return(self._data)


def get_partial(self):
return(self.get_all()[0:self.size])


def __getitem__(self, key):
"""get element"""
return(self._data[key])


def __repr__(self):
"""return string representation"""
s = self._data.__repr__()
s = s + '\t' + str(self.size)
s = s + '\t' + self.get_all()[::-1].__repr__()
s = s + '\t' + self.get_partial()[::-1].__repr__()
return(s)


class RingBufferFull(RingBuffer):
def append(self, value):
"""append an element when buffer is full"""
self._data = np.roll(self._data, 1)
self._data[0] = value

这个函数不需要任何库,它生成一个列表,然后按索引在其中循环。

内存占用非常小(没有库) ,并且运行速度至少是 dequeue 的两倍。确实,计算移动平均值是一件好事,但是要注意,这些项目并没有像上面那样按年龄排序。

class CircularBuffer(object):
def __init__(self, size):
"""initialization"""
self.index= 0
self.size= size
self._data = []


def record(self, value):
"""append an element"""
if len(self._data) == self.size:
self._data[self.index]= value
else:
self._data.append(value)
self.index= (self.index + 1) % self.size


def __getitem__(self, key):
"""get element by index like a regular array"""
return(self._data[key])


def __repr__(self):
"""return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'


def get_all(self):
"""return a list of all the elements"""
return(self._data)

得到平均值,例如:

q= CircularBuffer(1000000);
for i in range(40000):
q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

结果:

capacity= 1000000
stored= 40000
average= 19999


real 0m0.024s
user 0m0.020s
sys  0m0.000s

这大约是与 dequeue 等价的时间的1/3。

Python 的 deque 比较慢,也可以使用 numpy.roll 代替 如何旋转形状为(n,)或(n,1)的数组中的数字?

在这个基准测试中,deque 为448ms,Numpy.roll 为29ms Http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

最初的问题是: “ 有效率”循环缓冲区。 根据所要求的效率,来自 aaronasterling 的答案似乎是绝对正确的。 使用 Python 中编写的专用类,并将时间处理与 Collections.deque 进行比较,可以看到 x5.2倍加速和 deque! 这里有一些非常简单的代码来测试这一点:

class cb:
def __init__(self, size):
self.b = [0]*size
self.i = 0
self.sz = size
def append(self, v):
self.b[self.i] = v
self.i = (self.i + 1) % self.sz


b = cb(1000)
for i in range(10000):
b.append(i)
# called 200 times, this lasts 1.097 second on my laptop


from collections import deque
b = deque( [], 1000 )
for i in range(10000):
b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

要将 deque 转换为 list,只需使用:

my_list = [v for v in my_deque]

然后您将获得对 deque 项的 O (1)随机访问。当然,只有在设置了 deque 一次之后需要对其进行许多随机访问时,这才有价值。

基于 月仙人掌的答案,这里是一个 circularlist类。与他的版本不同的是,这里 c[0]总是给最老的附加元素,c[-1]给最新的附加元素,c[-2]给倒数第二个... 这对应用程序来说更自然。

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

班级:

class circularlist(object):
def __init__(self, size, data = []):
"""Initialization"""
self.index = 0
self.size = size
self._data = list(data)[-size:]


def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size


def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])


def __repr__(self):
"""Return string representation"""
return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

我在做串行编程之前就遇到过这个问题。就在一年前,我也没有找到任何有效的实现,所以我最终编写了 一个作为 C 的延伸,它也可以在 MIT 许可下使用。它非常基本,只能处理8位有符号字符的缓冲区,但是具有灵活的长度,所以如果需要除了字符之外的其他东西,可以使用 Struct 或者其他东西。我看到现在有一个谷歌搜索,有几个选项这些天虽然,所以你可能想看看这些。

这将同一个主体应用于一些用于保存最新文本消息的缓冲区。

import time
import datetime
import sys, getopt


class textbffr(object):
def __init__(self, size_max):
#initialization
self.posn_max = size_max-1
self._data = [""]*(size_max)
self.posn = self.posn_max


def append(self, value):
#append an element
if self.posn == self.posn_max:
self.posn = 0
self._data[self.posn] = value
else:
self.posn += 1
self._data[self.posn] = value


def __getitem__(self, key):
#return stored element
if (key + self.posn+1) > self.posn_max:
return(self._data[key - (self.posn_max-self.posn)])
else:
return(self._data[key + self.posn+1])




def print_bffr(bffr,bffer_max):
for ind in range(0,bffer_max):
stored = bffr[ind]
if stored != "":
print(stored)
print ( '\n' )


def make_time_text(time_value):
return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
+ str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
+ str(time_value.second).zfill(2))




def main(argv):
#Set things up
starttime = datetime.datetime.now()
log_max = 5
status_max = 7
log_bffr = textbffr(log_max)
status_bffr = textbffr(status_max)
scan_count = 1


#Main Loop
# every 10 secounds write a line with the time and the scan count.
while True:


time_text = make_time_text(datetime.datetime.now())
#create next messages and store in buffers
status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")


#print whole buffers so far
print_bffr(log_bffr,log_max)
print_bffr(status_bffr,status_max)


time.sleep(2)
scan_count += 1


if __name__ == '__main__':
main(sys.argv[1:])

虽然这里已经有很多很好的答案,但我找不到任何直接的时间比较提到的选项。因此,请在下面找到我谦卑的比较尝试。

仅出于测试目的,该类可以在基于 list的缓冲区、基于 collections.deque的缓冲区和基于 Numpy.roll的缓冲区之间切换。

注意,为了保持简单,update方法一次只添加一个值。

import numpy
import timeit
import collections




class CircularBuffer(object):
buffer_methods = ('list', 'deque', 'roll')


def __init__(self, buffer_size, buffer_method):
self.content = None
self.size = buffer_size
self.method = buffer_method


def update(self, scalar):
if self.method == self.buffer_methods[0]:
# Use list
try:
self.content.append(scalar)
self.content.pop(0)
except AttributeError:
self.content = [0.] * self.size
elif self.method == self.buffer_methods[1]:
# Use collections.deque
try:
self.content.append(scalar)
except AttributeError:
self.content = collections.deque([0.] * self.size,
maxlen=self.size)
elif self.method == self.buffer_methods[2]:
# Use Numpy.roll
try:
self.content = numpy.roll(self.content, -1)
self.content[-1] = scalar
except IndexError:
self.content = numpy.zeros(self.size, dtype=float)


# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
buffer_method=method)
for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
# We add a convenient number of convenient values (see equality test below)
code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
i, circular_buffer_size)
# Testing
eval(code)
buffer_content = [item for item in cb.content]
assert buffer_content == range(circular_buffer_size)
# Timing
timeit_results.append(
timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
cb.method, timeit_results[-1],
timeit_results[-1] / timeit_iterations * 1e3)

在我的系统中,这个结果是:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

Python 烹饪书中的解决方案如何,包括在环缓冲区实例满时对其重新分类?

class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self,size_max):
self.max = size_max
self.data = []


class __Full:
""" class that implements a full buffer """
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]


def append(self,x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full


def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data


# sample usage
if __name__=='__main__':
x=RingBuffer(5)
x.append(1); x.append(2); x.append(3); x.append(4)
print(x.__class__, x.get())
x.append(5)
print(x.__class__, x.get())
x.append(6)
print(x.data, x.get())
x.append(7); x.append(8); x.append(9); x.append(10)
print(x.data, x.get())

实现中值得注意的设计选择是,因为 物体在某一点经历不可逆的状态转变 它们的生存期ーー从非完全缓冲区到完全缓冲区(以及行为) 在这一点上的变化)ーー我通过改变 self.__class__来建模。 即使在 Python 2.2中,只要两个类具有相同的 插槽(例如,它可以很好地用于两个经典类,例如 RingBuffer 和 __Full)。

在许多语言中,更改实例的类可能很奇怪, 但它是其他表示方式的 Python 替代品 偶尔的、大规模的、不可逆转的、离散的状态变化 巨大的影响行为,如在这个食谱。好事情,Python 支持所有类。

图片来源: Sébastien Keim

来自 Github:

class CircularBuffer:


def __init__(self, size):
"""Store buffer in given storage."""
self.buffer = [None]*size
self.low = 0
self.high = 0
self.size = size
self.count = 0


def isEmpty(self):
"""Determines if buffer is empty."""
return self.count == 0


def isFull(self):
"""Determines if buffer is full."""
return self.count == self.size


def __len__(self):
"""Returns number of elements in buffer."""
return self.count


def add(self, value):
"""Adds value to buffer, overwrite as needed."""
if self.isFull():
self.low = (self.low+1) % self.size
else:
self.count += 1
self.buffer[self.high] = value
self.high = (self.high + 1) % self.size


def remove(self):
"""Removes oldest value from non-empty buffer."""
if self.count == 0:
raise Exception ("Circular Buffer is empty");
value = self.buffer[self.low]
self.low = (self.low + 1) % self.size
self.count -= 1
return value


def __iter__(self):
"""Return elements in the circular buffer in order using iterator."""
idx = self.low
num = self.count
while num > 0:
yield self.buffer[idx]
idx = (idx + 1) % self.size
num -= 1


def __repr__(self):
"""String representation of circular buffer."""
if self.isEmpty():
return 'cb:[]'


return 'cb:[' + ','.join(map(str,self)) + ']'

Https://github.com/heineman/python-data-structures/blob/master/2.%20ubiquitous%20lists/circbuffer.py

我不知道答案。显然,如果您在 NumPy 中工作,您希望子类化 array 或 ndarray (通常) ,这样(至少在循环数组满时)您仍然可以对循环数组使用 NumPy 数组算术操作。唯一需要注意的是,对于跨越多个组件(如移动平均值)的操作,您的窗口不能大于缓冲区中的累积值。

另外,正如所有评论者提到的,不要使用滚动,因为那会违背效率的目的。如果需要一个不断增长的数组,那么只需在每次需要调整大小时将其大小增加一倍(这与周期性数组实现不同)。

在(咳咳) RING BUFFER 的精神/效率中,有一个库你们都没有安装 pip: “ Pyring”。另外,我不明白为什么你们总是低效率地将环缓冲区标记为循环缓冲区阵列或者你们一直在称呼它。这就像把卫生棉条叫做女性卫生巾一样。简直是荒谬的滑稽剧。.