滚动或滑动窗口迭代器?

我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?

def rolling_window(seq, window_size):
it = iter(seq)
win = [it.next() for cnt in xrange(window_size)] # First window
yield win
for e in it: # Subsequent windows
win[:-1] = win[1:]
win[-1] = e
yield win


if __name__=="__main__":
for w in rolling_window(xrange(6), 3):
print w


"""Example output:
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
"""

对于window_size == 2特定的情况(即,在序列中迭代相邻的重叠对),请参见在Python中将列表迭代为pair (current, next)。

161670 次浏览

这似乎是为collections.deque量身定制的,因为你基本上有一个FIFO(添加到一端,从另一端移除)。然而,即使你使用list,你也不应该切片两次;相反,你应该只从列表中pop(0)append()新项。

下面是一个基于deque的优化实现:

from collections import deque


def window(seq, n=2):
it = iter(seq)
win = deque((next(it, None) for _ in xrange(n)), maxlen=n)
yield win
append = win.append
for e in it:
append(e)
yield win

在我的测试中,它在大多数时候都轻松击败了这里发布的其他所有东西,尽管pillmuncher的tee版本在大可迭代对象和小窗口方面击败了它。在较大的窗口上,deque再次以原始速度向前拉。

访问deque中的单个项可能比访问列表或元组更快或更慢。(接近开头的项目更快,如果你使用负号,接近结尾的项目更快。)我在循环体中放入sum(w);这发挥了deque的力量(从一个物品迭代到下一个物品非常快,所以这个循环比第二快的方法(pillmuncher的方法)快了整整20%)。当我将其更改为单独查找并在十个窗口中添加项目时,情况发生了变化,tee方法快了20%。我可以通过对加法中的最后五项使用负索引来恢复一些速度,但tee仍然稍微快一些。总的来说,我估计对于大多数用途来说,任何一种都足够快,如果您需要更多的性能,请配置并选择最好的一种。

在旧版本的Python文档中有一个itertools例子:

from itertools import islice


def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
"   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result

文档中的那个更简洁一点,我想它使用了itertools来达到更好的效果。


如果你的迭代器是一个简单的list/tuple用指定的窗口大小滑动它的简单方法是:

seq = [0, 1, 2, 3, 4, 5]
window_size = 3


for i in range(len(seq) - window_size + 1):
print(seq[i: i + window_size])

输出:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

我喜欢tee():

from itertools import tee, izip


def window(iterable, size):
iters = tee(iterable, size)
for i in xrange(1, size):
for each in iters[i:]:
next(each, None)
return izip(*iters)


for each in window(xrange(6), 3):
print list(each)

给:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

我使用下面的代码作为一个简单的滑动窗口,它使用生成器来大幅提高可读性。根据我的经验,到目前为止,它的速度足以用于生物信息学序列分析。

我把它包括在这里是因为我还没有看到这种方法被使用过。同样,我对它的比较性能没有任何评价。

def slidingWindow(sequence,winSize,step=1):
"""Returns a generator that will iterate through
the defined chunks of input sequence. Input sequence
must be sliceable."""


# Verify the inputs
if not ((type(winSize) == type(0)) and (type(step) == type(0))):
raise Exception("**ERROR** type(winSize) and type(step) must be int.")
if step > winSize:
raise Exception("**ERROR** step must not be larger than winSize.")
if winSize > len(sequence):
raise Exception("**ERROR** winSize must not be larger than sequence length.")


# Pre-compute number of chunks to emit
numOfChunks = ((len(sequence)-winSize)/step)+1


# Do the work
for i in range(0,numOfChunks*step,step):
yield sequence[i:i+winSize]

只是一个简短的贡献。

由于当前的python文档在itertool示例中没有“window”(即,在http://docs.python.org/library/itertools.html的底部),这里有一个基于 石斑鱼的代码,它是给出的例子之一:

import itertools as it
def window(iterable, size):
shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)]
return it.izip(*shiftedStarts)

基本上,我们创建了一系列切片迭代器,每个迭代器的起点都在前面一个位置。然后,我们把它们拉在一起。注意,这个函数返回一个生成器(它本身不是直接的生成器)。

就像上面的appendingelement和advingiterator版本一样,性能(即,哪个是最好的)随列表大小和窗口大小而变化。我喜欢这个,因为它是一个两行代码(它也可以是一行代码,但我更喜欢命名概念)。

上面的代码是错误的。如果传递给可迭代的的参数是一个序列,但如果它是一个迭代器,则无效。如果它是一个迭代器,那么在islice调用之间共享相同的迭代器(但不是tee - d),这将严重破坏事情。

下面是一些固定的代码:

import itertools as it
def window(iterable, size):
itrs = it.tee(iterable, size)
shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)]
return it.izip(*shiftedStarts)

另外,书里还有一个版本。这个版本不是复制一个迭代器,然后多次向前复制,而是在开始位置向前移动时成对复制每个迭代器。因此,迭代器t既提供了起点为t的“完整”迭代器,也提供了创建迭代器t + 1的基础:

import itertools as it
def window4(iterable, size):
complete_itr, incomplete_itr = it.tee(iterable, 2)
iters = [complete_itr]
for i in xrange(1, size):
incomplete_itr.next()
complete_itr, incomplete_itr = it.tee(incomplete_itr, 2)
iters.append(complete_itr)
return it.izip(*iters)

下面是一个泛化,它增加了对stepfillvalue参数的支持:

from collections import deque
from itertools import islice


def sliding_window(iterable, size=2, step=1, fillvalue=None):
if size < 0 or step < 1:
raise ValueError
it = iter(iterable)
q = deque(islice(it, size), maxlen=size)
if not q:
return  # empty iterable or size == 0
q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
while True:
yield iter(q)  # iter() to avoid accidental outside modifications
try:
q.append(next(it))
except StopIteration: # Python 3.5 pep 479 support
return
q.extend(next(it, fillvalue) for _ in range(step - 1))

它每次生成size个块,每次迭代滚动step个位置,必要时用fillvalue填充每个块。size=4, step=3, fillvalue='*'示例:

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
a b c[d e f g]h i j k l m n o p q r s t u v w x y z
a b c d e f[g h i j]k l m n o p q r s t u v w x y z
a b c d e f g h i[j k l m]n o p q r s t u v w x y z
a b c d e f g h i j k l[m n o p]q r s t u v w x y z
a b c d e f g h i j k l m n o[p q r s]t u v w x y z
a b c d e f g h i j k l m n o p q r[s t u v]w x y z
a b c d e f g h i j k l m n o p q r s t u[v w x y]z
a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step参数的用例示例,请参见用python有效地处理一个大的。txt文件

deque窗口的一个轻微修改版本,使其成为一个真正的滚动窗口。因此,它开始只填充一个元素,然后增长到它的最大窗口大小,然后缩小,因为它的左边缘接近结束:

from collections import deque
def window(seq, n=2):
it = iter(seq)
win = deque((next(it, None) for _ in xrange(1)), maxlen=n)
yield win
append = win.append
for e in it:
append(e)
yield win
for _ in xrange(len(win)-1):
win.popleft()
yield win


for wnd in window(range(5), n=3):
print(list(wnd))

这给了

[0]
[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]
>>> n, m = 6, 3
>>> k = n - m+1
>>> print ('{}\n'*(k)).format(*[range(i, i+m) for i in xrange(k)])
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

多个迭代器!

def window(seq, size, step=1):
# initialize iterators
iters = [iter(seq) for i in range(size)]
# stagger iterators (without yielding)
[next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
while(True):
yield [next(i) for i in iters]
# next line does nothing for step = 1 (skips iterations for step > 1)
[next(i) for i in iters for j in range(step-1)]

next(it)在序列结束时引发StopIteration,出于某种我无法理解的很酷的原因,yield语句在这里将它排除在外,函数返回,忽略未形成完整窗口的剩余值。

不管怎样,这是目前为止最少行数的解决方案,它唯一的要求是seq实现__iter____getitem__,并且不依赖于itertoolscollections,除了@dansalmo的解决方案:)

def rolling_window(list, degree):
for i in range(len(list)-degree+1):
yield [list[i+o] for o in range(degree)]

这是一个滚动平均函数

为什么不

def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)

在Python 医生中有文档。 您可以轻松地将其扩展到更宽的窗口。< / p >

如何使用以下方法:

mylist = [1, 2, 3, 4, 5, 6, 7]


def sliding_window(l, window_size=2):
if window_size > len(l):
raise ValueError("Window size must be smaller or equal to the number of elements in the list.")


t = []
for i in xrange(0, window_size):
t.append(l[i:])


return zip(*t)


print sliding_window(mylist, 3)

输出:

[(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]
def GetShiftingWindows(thelist, size):
return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ]


>> a = [1, 2, 3, 4, 5]
>> GetShiftingWindows(a, 3)
[ [1, 2, 3], [2, 3, 4], [3, 4, 5] ]

为了演示如何组合itertools食谱,我使用consume recipe将pairwise recipe尽可能直接地扩展回window recipe:

def consume(iterator, n):
"Advance the iterator n-steps ahead. If n is none, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)


def window(iterable, n=2):
"s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..."
iters = tee(iterable, n)
# Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's
# slower for larger window sizes, while saving only small fixed "noop" cost
for i, it in enumerate(iters):
consume(it, i)
return zip(*iters)

window配方与pairwise相同,它只是将第二个tee-ed迭代器上的单个元素“consume”替换为n - 1迭代器上逐渐增加的consume。使用consume而不是将每个迭代器包装在islice中略微更快(对于足够大的可迭代对象),因为你只在consume阶段支付islice包装开销,而不是在提取每个窗口值的过程中(因此它受到n的限制,而不是iterable中的项数)。

在性能方面,与其他一些解决方案相比,这是相当不错的(并且比我测试的任何其他解决方案都要好)。在Python 3.5.0, Linux x86-64上测试,使用ipython %timeit魔术。

kindall是deque解决方案,通过使用islice而不是自制生成器表达式来调整性能/正确性,并测试产生的长度,以便当可迭代对象比窗口短时不会产生结果,以及根据位置而不是通过关键字传递dequemaxlen(对于较小的输入会产生惊人的差异):

>>> %timeit -r5 deque(windowkindall(range(10), 3), 0)
100000 loops, best of 5: 1.87 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 3), 0)
10000 loops, best of 5: 72.6 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 30), 0)
1000 loops, best of 5: 71.6 μs per loop

与之前的改编类解决方案相同,但每个yield win都更改为yield tuple(win),因此存储生成器的结果可以工作,而所有存储的结果实际上都是最新结果的视图(在这种情况下,所有其他合理的解决方案都是安全的),并将tuple=tuple添加到函数定义中,将tuple的使用从LEGB中的B移动到L:

>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0)
100000 loops, best of 5: 3.05 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0)
10000 loops, best of 5: 207 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0)
1000 loops, best of 5: 348 μs per loop

上面所示的基于abc0的解决方案:

>>> %timeit -r5 deque(windowconsume(range(10), 3), 0)
100000 loops, best of 5: 3.92 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 3), 0)
10000 loops, best of 5: 42.8 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 30), 0)
1000 loops, best of 5: 232 μs per loop

consume相同,但内联consumeelse大小写以避免函数调用和n is None测试以减少运行时,特别是对于设置开销是工作中有意义的一部分的小输入:

>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0)
100000 loops, best of 5: 3.57 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0)
10000 loops, best of 5: 40.9 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0)
1000 loops, best of 5: 211 μs per loop

(附注:pairwise的一个变体,反复使用带有默认参数2的tee来创建嵌套的tee对象,因此任何给定的迭代器只前进一次,而不是独立地消耗越来越多的次数,类似于MrDrFenner的回答,类似于非内联的consume,并且在所有测试中比内联的consume慢,所以为了简洁起见,我省略了这些结果)。

如你所见,如果你不关心调用者需要存储结果的可能性,我的优化版本的kindall's解决方案在大多数情况下胜出,除了在“大可迭代,小窗口大小的情况下”(其中内联的consume胜出);它会随着可迭代对象大小的增加而快速退化,而不会随着窗口大小的增加而完全退化(其他解决方案会随着可迭代对象大小的增加而退化得更慢,但也会随着窗口大小的增加而退化)。它甚至可以通过包装map(tuple, ...)来适应“需要元组”的情况,它比将元组放入函数中运行得稍微慢一些,但这是微不足道的(花费1-5%的时间),并让您在可以容忍重复返回相同值时保持运行更快的灵活性。

如果你需要防止返回值被存储,内联的consume可以在最小的输入大小之外的所有输入上胜出(非内联的consume略慢,但伸缩相似)。deque &基于元组的解决方案只对最小的输入获胜,因为更小的设置成本,收益也很小;当可迭代对象变长时,它会严重退化。

为了记录,我使用的kindall的解决方案的改编版本yields tuples是:

def windowkindalltupled(iterable, n=2, tuple=tuple):
it = iter(iterable)
win = deque(islice(it, n), n)
if len(win) < n:
return
append = win.append
yield tuple(win)
for e in it:
append(e)
yield tuple(win)

在函数定义行中删除tuple的缓存,并在每个yield中使用tuple,以获得更快但不太安全的版本。

这是一个老问题,但对于那些仍然感兴趣的人来说,在 page中有一个使用生成器的窗口滑动器的伟大实现(by Adrian Rosebrock)。

它是OpenCV的一个实现,但是你可以很容易地将它用于任何其他目的。对于渴望的人,我将粘贴代码在这里,但为了更好地理解它,我建议访问原始页面。

def sliding_window(image, stepSize, windowSize):
# slide a window across the image
for y in xrange(0, image.shape[0], stepSize):
for x in xrange(0, image.shape[1], stepSize):
# yield the current window
yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]])

你可以在迭代生成器时检查窗口的.shape,以丢弃那些不符合你的要求的

干杯

有一个库可以完全满足你的需要:

import more_itertools
list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3))


Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]

修改了DiPaolo的回答以允许任意填充和可变步长

import itertools
def window(seq, n=2,step=1,fill=None,keep=0):
"Returns a sliding window (of width n) over data from the iterable"
"   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
it = iter(seq)
result = tuple(itertools.islice(it, n))
if len(result) == n:
yield result
while True:
#         for elem in it:
elem = tuple( next(it, fill) for _ in range(step))
result = result[step:] + elem
if elem[-1] is fill:
if keep:
yield result
break
yield result
#Importing the numpy library
import numpy as np
arr = np.arange(6) #Sequence
window_size = 3
np.lib.stride_tricks.as_strided(arr, shape= (len(arr) - window_size +1, window_size),
strides = arr.strides*2)


"""Example output:


[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

”“”

让我们让它变懒!

from itertools import islice, tee


def window(iterable, size):
iterators = tee(iterable, size)
iterators = [islice(iterator, i, None) for i, iterator in enumerate(iterators)]
yield from zip(*iterators)


list(window(range(5), 3))
# [(0, 1, 2), (1, 2, 3), (2, 3, 4)]

这里有一行。我对它进行了计时,它与顶部答案的性能相当,并且随着更大的seq逐渐变得更好,len(seq) = 20时慢20%,len(seq) = 10000时慢7%

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])

尝试我的部分,简单,一行,使用islice的python方式。但是,可能不是最佳效率。

from itertools import islice
array = range(0, 10)
window_size = 4
map(lambda i: list(islice(array, i, i + window_size)), range(0, len(array) - window_size + 1))
# output = [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]
< p >解释: 使用islice of window_size创建窗口,并在所有数组上使用map迭代此操作

除了我想到的解决方案,我还测试了一些解决方案。我发现我想出的是最快的,所以我想分享我的python3实现。

import itertools
import sys


def windowed(l, stride):
return zip(*[itertools.islice(l, i, sys.maxsize) for i in range(stride)])

深度学习中滑动窗口数据的优化函数

def SlidingWindow(X, window_length, stride):
indexer = np.arange(window_length)[None, :] + stride*np.arange(int(len(X)/stride)-window_length+4)[:, None]
return X.take(indexer)

应用于多维数组

import numpy as np
def SlidingWindow(X, window_length, stride1):
stride=  X.shape[1]*stride1
window_length = window_length*X.shape[1]
indexer = np.arange(window_length)[None, :] + stride1*np.arange(int(len(X)/stride1)-window_length-1)[:, None]
return X.take(indexer)

我的两个版本的window实现

from typing import Sized, Iterable


def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
for i in range(0, len(seq), strid):
res = seq[i:i + n]
if drop_last and len(res) < n:
break
yield res




def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
it = iter(seq)
result = []
step = 0
for i, ele in enumerate(it):
result.append(ele)
result = result[-n:]
if len(result) == n:
if step % strid == 0:
yield result
step += 1
if not drop_last:
yield result


另一种从列表生成固定长度窗口的简单方法

from collections import deque


def window(ls,window_size=3):
window = deque(maxlen=window_size)


for element in ls:
        

if len(window)==window_size:
yield list(window)
window.append(element)


ls = [0,1,2,3,4,5]


for w in window(ls):
print(w)

我最终使用的解决方案(保持简单):

def sliding_window(items, size):
return [items[start:end] for start, end
in zip(range(0, len(items) - size + 1), range(size, len(items) + 1))]

不用说,items序列需要是可切片的。使用索引并不理想,但考虑到其他选项,这似乎是最不坏的选择……这也可以很容易地更改为生成器:只需将[...]替换为(...)

toolz/cytoolz包有一个滑动窗口函数。

>>> from cytoolz import sliding_window
>>> list(sliding_window(3, range(6))) # returns [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5)]

在Python 3.10中,我们有itertools.pairwise(iterable)函数来滑动包含两个元素的窗口:

下面是医生:

返回从输入可迭代对象中获取的连续重叠对。

输出迭代器中2元组的数量将比输入的数量少1。如果输入可迭代对象的值少于两个,则返回空值。

大致相当于:

def pairwise(iterable):
# pairwise('ABCDEFG') --> AB BC CD DE EF FG
a, b = tee(iterable)
next(b, None)
return zip(a, b)

我很惊讶没有一个单一的解决方案使用列表理解。

奇怪的是,当n > len(l)返回语义正确的[]时,下面的句柄自动返回。

>>> l = [0, 1, 2, 3, 4]


>>> n = 2
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1], [1, 2], [2, 3], [3, 4]]
>>>
>>> n = 3
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>>
>>> n = 4
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3], [1, 2, 3, 4]]
>>>
>>> n = 5
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3, 4]]
>>>
>>> n = 10 # n > len(l)
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> []