在 Python 中按块(n)迭代一个迭代器?

你能想到一个很好的方法(也许使用 itertools)将迭代器分割成给定大小的块吗?

因此,带有 chunks(l,3)l=[1,2,3,4,5,6,7]成为迭代器 [1,2,3], [4,5,6], [7]

我可以想到一个小程序来做到这一点,但不是一个很好的方式,也许迭代工具。

86316 次浏览

itertools文档的 食谱中的 grouper()配方与您想要的非常接近:

def grouper(n, iterable, fillvalue=None):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)

但是,它将用一个 fill 值填充最后一个块。

一种不那么通用的解决方案,它只对序列起作用,但按照需要处理最后一个块

[my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size)]

最后,一个在一般迭代器上工作的解决方案是

def grouper(n, iterable):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk

“简单胜过复杂”- 一个简单的发电机只需要几行就可以完成这项工作,只要把它放在一些公用事业模块中就可以了:

def grouper (iterable, n):
iterable = iter(iterable)
count = 0
group = []
while True:
try:
group.append(next(iterable))
count += 1
if count % n == 0:
yield group
group = []
except StopIteration:
yield group
break

这里有一个返回惰性块的函数,如果需要列表,可以使用 map(list, chunks(...))

from itertools import islice, chain
from collections import deque


def chunks(items, n):
items = iter(items)
for first in items:
chunk = chain((first,), islice(items, n-1))
yield chunk
deque(chunk, 0)


if __name__ == "__main__":
for chunk in map(list, chunks(range(10), 3)):
print chunk


for i, chunk in enumerate(chunks(range(10), 3)):
if i % 2 == 1:
print "chunk #%d: %s" % (i, list(chunk))
else:
print "skipping #%d" % i

我忘了我是从哪里找到这个灵感的。我在 Windows 注册表中对它进行了一些修改,以便与 MSI GUID 一起工作:

def nslice(s, n, truncate=False, reverse=False):
"""Splits s into n-sized chunks, optionally reversing the chunks."""
assert n > 0
while len(s) >= n:
if reverse: yield s[:n][::-1]
else: yield s[:n]
s = s[n:]
if len(s) and not truncate:
yield s

reverse不适用于您的问题,但是我在这个函数中广泛使用它。

>>> [i for i in nslice([1,2,3,4,5,6,7], 3)]
[[1, 2, 3], [4, 5, 6], [7]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True)]
[[1, 2, 3], [4, 5, 6]]
>>> [i for i in nslice([1,2,3,4,5,6,7], 3, truncate=True, reverse=True)]
[[3, 2, 1], [6, 5, 4]]

给你。

def chunksiter(l, chunks):
i,j,n = 0,0,0
rl = []
while n < len(l)/chunks:
rl.append(l[i:j+chunks])
i+=chunks
j+=j+chunks
n+=1
return iter(rl)




def chunksiter2(l, chunks):
i,j,n = 0,0,0
while n < len(l)/chunks:
yield l[i:j+chunks]
i+=chunks
j+=j+chunks
n+=1

例子:

for l in chunksiter([1,2,3,4,5,6,7,8],3):
print(l)


[1, 2, 3]
[4, 5, 6]
[7, 8]


for l in chunksiter2([1,2,3,4,5,6,7,8],3):
print(l)


[1, 2, 3]
[4, 5, 6]
[7, 8]




for l in chunksiter2([1,2,3,4,5,6,7,8],5):
print(l)


[1, 2, 3, 4, 5]
[6, 7, 8]

一个简洁的实施方案是:

chunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))

这样做的原因是,[iter(iterable)]*n是一个包含相同迭代器 n 次的列表; 通过压缩从列表中的每个迭代器 是同一个迭代器获取一个项,结果是每个 zip 元素包含一组 n项。

需要使用 izip_longest来完全使用底层迭代器,而不是在到达第一个用尽的迭代器时停止迭代,这会从 iterable中切除任何余数。这导致需要过滤掉填充值。因此,一个略为强有力的实施办法是:

def chunker(iterable, n):
class Filler(object): pass
return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))

这保证了填充值永远不会是底层迭代器中的一个项目。使用上面的定义:

iterable = range(1,11)


map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]


map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]


map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]

这个实现几乎可以完成您想要的任务,但是它存在一些问题:

def chunks(it, step):
start = 0
while True:
end = start+step
yield islice(it, start, end)
start = end

(区别在于,因为 islice不会引发 StopIteration 或其他任何超过 it末尾的调用,这将永远产生; 还有一个稍微棘手的问题,即在迭代这个生成器之前必须消耗 islice结果)。

生成功能性的移动窗口:

izip(count(0, step), count(step, step))

这就变成了:

(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))

但是,这仍然创建了一个无限的迭代器。所以,你需要花点时间(或者其他更好的东西)来限制它:

chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))


g = chunk(range(1,11), 3)


tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])

尽管 OP 要求函数以列表或元组的形式返回块,但如果需要返回迭代器,那么可以修改 斯文 · 马尔纳的解决方案:

def grouper_it(n, iterable):
it = iter(iterable)
while True:
chunk_it = itertools.islice(it, n)
try:
first_el = next(chunk_it)
except StopIteration:
return
yield itertools.chain((first_el,), chunk_it)

一些基准: http://pastebin.com/YkKFvm8b

只有当函数遍历每个块中的元素时,效率才会稍微高一些。

我今天在做一件事,想出了一个我认为是简单的解决方案。它类似于 Jsbueno’s的答案,但是我相信当 iterable的长度被 n整除时,他会得到空的 group。当 iterable耗尽时,我的答案会做一个简单的检查。

def chunk(iterable, chunk_size):
"""Generates lists of `chunk_size` elements from `iterable`.
    

    

>>> list(chunk((2, 3, 5, 7), 3))
[[2, 3, 5], [7]]
>>> list(chunk((2, 3, 5, 7), 2))
[[2, 3], [5, 7]]
"""
iterable = iter(iterable)
while True:
chunk = []
try:
for _ in range(chunk_size):
chunk.append(next(iterable))
yield chunk
except StopIteration:
if chunk:
yield chunk
break

这将适用于任何迭代。它返回发电机的生成器(为了充分的灵活性)。我现在意识到它基本上和@reclosedevs 解决方案一样,只是没有那些花里胡哨的东西。当 StopIteration向上传播时,不需要 try...except,这正是我们想要的。

当迭代器为空时,需要使用 next(iterable)调用来引发 StopIteration,因为如果您允许,islice将永远生成空生成器。

这样更好,因为它只有两行长,而且容易理解。

def grouper(iterable, n):
while True:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))

请注意,next(iterable)被放入一个元组中。否则,如果 next(iterable)本身是可迭代的,那么 itertools.chain将使它变平。感谢杰里米 · 布朗指出了这个问题。

自从 python 3.8以来,有一个使用 :=运算符的更简单的解决方案:

def grouper(iterator: Iterator, n: int) -> Iterator[list]:
while chunk := list(itertools.islice(iterator, n)):
yield chunk

然后这么说:

>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]

注意: 可以将 iter放在 grouper函数中,以取 Iterable代替 Iterator

高尔夫密码版:

def grouper(iterable, n):
for i in range(0, len(iterable), n):
yield iterable[i:i+n]

用法:

>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']

这个函数接受不需要是 Sized的迭代器,因此它也接受迭代器。它支持无限可迭代,如果选择的块大小小于1,它将出错(即使给出 size = = 1实际上是无用的)。

类型注释当然是可选的,如果您愿意,可以删除参数中的 /(这使得 iterable只是位置的)。

T = TypeVar("T")




def chunk(iterable: Iterable[T], /, size: int) -> Generator[list[T], None, None]:
"""Yield chunks of a given size from an iterable."""
if size < 1:
raise ValueError("Cannot make chunks smaller than 1 item.")


def chunker():
current_chunk = []
for item in iterable:
current_chunk.append(item)


if len(current_chunk) == size:
yield current_chunk


current_chunk = []


if current_chunk:
yield current_chunk


# Chunker generator is returned instead of yielding directly so that the size check
#  can raise immediately instead of waiting for the first next() call.
return chunker()