如何在块上迭代一个列表

我有一个Python脚本,它把一个整数列表作为输入,我需要一次处理四个整数。不幸的是,我无法控制输入,否则我将它作为一个四元素元组列表传入。目前,我以这种方式迭代它:

for i in range(0, len(ints), 4):
# dummy op for example code
foo += ints[i] * ints[i + 1] + ints[i + 2] * ints[i + 3]

不过,它看起来很像“c -think”,这让我怀疑有一种更python的方式来处理这种情况。该列表在迭代后被丢弃,因此不需要保留。也许这样会更好?

while ints:
foo += ints[0] * ints[1] + ints[2] * ints[3]
ints[0:4] = []

还是不太“感觉”;不过,对吧。: - /

相关问题:在Python中如何将列表分割成大小均匀的块?

220300 次浏览

似乎没有一个漂亮的方法来做到这一点。在这里是一个有很多方法的页面,包括:

def split_seq(seq, size):
newseq = []
splitsize = 1.0/size*len(seq)
for i in range(size):
newseq.append(seq[int(round(i*splitsize)):int(round((i+1)*splitsize))])
return newseq

在你的第二种方法中,我将通过这样做进入下一组4人:

ints = ints[4:]

然而,我还没有做过任何绩效评估,所以我不知道哪种方法更有效。

话虽如此,我通常会选择第一种方法。这并不漂亮,但这通常是与外部世界接触的结果。

import itertools
def chunks(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it,size))


# though this will throw ValueError if the length of ints
# isn't a multiple of four:
for x1,x2,x3,x4 in chunks(ints,4):
foo += x1 + x2 + x3 + x4


for chunk in chunks(ints,4):
foo += sum(chunk)

另一种方法:

import itertools
def chunks2(iterable,size,filler=None):
it = itertools.chain(iterable,itertools.repeat(filler,size-1))
chunk = tuple(itertools.islice(it,size))
while len(chunk) == size:
yield chunk
chunk = tuple(itertools.islice(it,size))


# x2, x3 and x4 could get the value 0 if the length is not
# a multiple of 4.
for x1,x2,x3,x4 in chunks2(ints,4,0):
foo += x1 + x2 + x3 + x4
chunk_size = 4
for i in range(0, len(ints), chunk_size):
chunk = ints[i:i+chunk_size]
# process chunk of size <= chunk_size
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))

适用于任何序列:

text = "I am a very, very helpful text"


for group in chunker(text, 7):
print(repr(group),)
# 'I am a ' 'very, v' 'ery hel' 'pful te' 'xt'


print('|'.join(chunker(text, 10)))
# I am a ver|y, very he|lpful text


animals = ['cat', 'dog', 'rabbit', 'duck', 'bird', 'cow', 'gnu', 'fish']


for group in chunker(animals, 3):
print(group)
# ['cat', 'dog', 'rabbit']
# ['duck', 'bird', 'cow']
# ['gnu', 'fish']

如果列表很大,执行效率最高的方法是使用生成器:

def get_chunk(iterable, chunk_size):
result = []
for item in iterable:
result.append(item)
if len(result) == chunk_size:
yield tuple(result)
result = []
if len(result) > 0:
yield tuple(result)


for x in get_chunk([1,2,3,4,5,6,7,8,9,10], 3):
print x


(1, 2, 3)
(4, 5, 6)
(7, 8, 9)
(10,)

如果列表大小相同,您可以使用zip()将它们组合成4元组列表。例如:

# Four lists of four elements each.


l1 = range(0, 4)
l2 = range(4, 8)
l3 = range(8, 12)
l4 = range(12, 16)


for i1, i2, i3, i4 in zip(l1, l2, l3, l4):
...

下面是zip()函数生成的结果:

>>> print l1
[0, 1, 2, 3]
>>> print l2
[4, 5, 6, 7]
>>> print l3
[8, 9, 10, 11]
>>> print l4
[12, 13, 14, 15]
>>> print zip(l1, l2, l3, l4)
[(0, 4, 8, 12), (1, 5, 9, 13), (2, 6, 10, 14), (3, 7, 11, 15)]

如果列表很大,并且您不想将它们组合成一个更大的列表,请使用itertools.izip(),它将生成一个迭代器,而不是一个列表。

from itertools import izip


for i1, i2, i3, i4 in izip(l1, l2, l3, l4):
...
from itertools import izip_longest


def chunker(iterable, chunksize, filler):
return izip_longest(*[iter(iterable)]*chunksize, fillvalue=filler)

修改自Python的itertools文档中的食谱部分:

from itertools import zip_longest


def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)

例子

grouper('ABCDEFG', 3, 'x')  # --> 'ABC' 'DEF' 'Gxx'

在Python 2上使用izip_longest代替zip_longest

因为还没有人提到它,这里有一个zip()解决方案:

>>> def chunker(iterable, chunksize):
...     return zip(*[iter(iterable)]*chunksize)

只有当你的序列长度总是能被块大小整除时,它才有效,或者如果后面的块不能整除,你就不关心它。

例子:

>>> s = '1234567890'
>>> chunker(s, 3)
[('1', '2', '3'), ('4', '5', '6'), ('7', '8', '9')]
>>> chunker(s, 4)
[('1', '2', '3', '4'), ('5', '6', '7', '8')]
>>> chunker(s, 5)
[('1', '2', '3', '4', '5'), ('6', '7', '8', '9', '0')]

或者使用itertools.izip返回一个迭代器而不是一个列表:

>>> from itertools import izip
>>> def chunker(iterable, chunksize):
...     return izip(*[iter(iterable)]*chunksize)

填充可以使用@ΤΖΩΤΖΙΟΥ回答来修复:

>>> from itertools import chain, izip, repeat
>>> def chunker(iterable, chunksize, fillvalue=None):
...     it   = chain(iterable, repeat(fillvalue, chunksize-1))
...     args = [it] * chunksize
...     return izip(*args)

使用map()而不是zip()修复填充问题在J.F.塞巴斯蒂安的回答:

>>> def chunker(iterable, chunksize):
...   return map(None,*[iter(iterable)]*chunksize)

例子:

>>> s = '1234567890'
>>> chunker(s, 3)
[('1', '2', '3'), ('4', '5', '6'), ('7', '8', '9'), ('0', None, None)]
>>> chunker(s, 4)
[('1', '2', '3', '4'), ('5', '6', '7', '8'), ('9', '0', None, None)]
>>> chunker(s, 5)
[('1', '2', '3', '4', '5'), ('6', '7', '8', '9', '0')]

这个问题的理想解决方案是使用迭代器(而不仅仅是序列)。它还应该是快速的。

这是itertools文档提供的解决方案:

def grouper(n, iterable, fillvalue=None):
#"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)

在我的mac book air上使用ipython的%timeit,我每次循环得到47.5 us。

然而,这真的不适合我,因为结果被填充为偶数大小的组。没有填充的解决方案稍微复杂一些。最天真的解决方案可能是:

def grouper(size, iterable):
i = iter(iterable)
while True:
out = []
try:
for _ in range(size):
out.append(i.next())
except StopIteration:
yield out
break
        

yield out

简单,但相当慢:每循环693个

我能想到的最好的解决方案是使用islice内循环:

def grouper(size, iterable):
it = iter(iterable)
while True:
group = tuple(itertools.islice(it, None, size))
if not group:
break
yield group

对于同样的数据集,我每循环得到305 us。

由于无法更快地得到一个纯解决方案,我提供了以下解决方案,但有一个重要的警告:如果您的输入数据中有filldata的实例,则可能得到错误的答案。

def grouper(n, iterable, fillvalue=None):
#"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
# itertools.zip_longest on Python 3
for x in itertools.izip_longest(*args, fillvalue=fillvalue):
if x[-1] is fillvalue:
yield tuple(v for v in x if v is not fillvalue)
else:
yield x

我真的不喜欢这个答案,但它明显更快。每回路124 us

还有另一个答案,它的优点是:

1)容易理解
2)适用于任何可迭代对象,而不仅仅是序列(上面的一些答案会阻塞文件句柄)
3)不立即将数据块加载到内存中
4)不在内存
中创建一个块长度的对同一迭代器的引用列表 5)在列表的末尾没有填充填充值

话虽如此,我还没有计算它的时间,所以它可能比一些更聪明的方法慢,而且考虑到用例,一些优势可能是无关紧要的。

def chunkiter(iterable, size):
def inneriter(first, iterator, size):
yield first
for _ in xrange(size - 1):
yield iterator.next()
it = iter(iterable)
while True:
yield inneriter(it.next(), it, size)


In [2]: i = chunkiter('abcdefgh', 3)
In [3]: for ii in i:
for c in ii:
print c,
print ''
...:
a b c
d e f
g h
< p > # EYZ0 < br > 由于内部循环和外部循环从同一个迭代器中提取值,有几个缺点:
1) continue在外部循环中没有像预期的那样工作——它只是继续到下一项,而不是跳过一个块。然而,这似乎不是一个问题,因为在外循环中没有什么要测试的 2) break在内部循环中没有像预期的那样工作-控件将再次与迭代器中的下一项一起进入内部循环。要跳过整个块,可以将内部迭代器(如上ii)包装在元组中,例如for c in tuple(ii),或者设置一个标志并耗尽迭代器

类似于其他提案,但不完全相同,我喜欢这样做,因为它简单易读:

it = iter([1, 2, 3, 4, 5, 6, 7, 8, 9])
for chunk in zip(it, it, it, it):
print chunk


>>> (1, 2, 3, 4)
>>> (5, 6, 7, 8)

这样你就不会得到最后一部分。如果你想获得(9, None, None, None)作为最后一个块,只需从itertools中使用izip_longest

使用小的函数和东西真的不吸引我;我更喜欢使用切片:

data = [...]
chunk_size = 10000 # or whatever
chunks = [data[i:i+chunk_size] for i in xrange(0,len(data),chunk_size)]
for chunk in chunks:
...

我需要一个解决方案,也将工作与集和生成器。我写不出很短很漂亮的东西,但至少可读性很好。

def chunker(seq, size):
res = []
for el in seq:
res.append(el)
if len(res) == size:
yield res
res = []
if res:
yield res

列表:

>>> list(chunker([i for i in range(10)], 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

设置:

>>> list(chunker(set([i for i in range(10)]), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

发电机:

>>> list(chunker((i for i in range(10)), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
def group_by(iterable, size):
"""Group an iterable into lists that don't exceed the size given.


>>> group_by([1,2,3,4,5], 2)
[[1, 2], [3, 4], [5]]


"""
sublist = []


for index, item in enumerate(iterable):
if index > 0 and index % size == 0:
yield sublist
sublist = []


sublist.append(item)


if sublist:
yield sublist

另一种方法是使用双参数形式的iter:

from itertools import islice


def group(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())

这可以很容易地适应使用填充(这类似于马库斯Jarderot的答案):

from itertools import islice, chain, repeat


def group_pad(it, size, pad=None):
it = chain(iter(it), repeat(pad))
return iter(lambda: tuple(islice(it, size)), (pad,) * size)

这些甚至可以组合为可选的填充:

_no_pad = object()
def group(it, size, pad=_no_pad):
if pad == _no_pad:
it = iter(it)
sentinel = ()
else:
it = chain(iter(it), repeat(pad))
sentinel = (pad,) * size
return iter(lambda: tuple(islice(it, size)), sentinel)

你可以使用分区函数从funcy库:

from funcy import partition


for a, b, c, d in partition(4, ints):
foo += a * b * c * d

这些函数还有迭代器版本ipartitionichunks,在这种情况下更有效。

您还可以查看他们的实现

一行程序,特别的解决方案,在大小为4 -的块中迭代列表x

for a, b, c, d in zip(x[0::4], x[1::4], x[2::4], x[3::4]):
... do something with a, b, c and d ...

避免所有转换为列表import itertools和:

>>> for k, g in itertools.groupby(xrange(35), lambda x: x/10):
...     list(g)

生产:

...
0 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2 [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
3 [30, 31, 32, 33, 34]
>>>

我检查了groupby,它不转换为列表或使用len,所以我(认为)这将延迟每个值的解析,直到它实际使用。不幸的是,没有一个现成的答案(在这个时候)似乎提供了这种变化。

显然,如果你需要依次处理每一项,在g上嵌套一个for循环:

for k,g in itertools.groupby(xrange(35), lambda x: x/10):
for i in g:
# do what you need to do with individual items
# now do what you need to do with the whole group

我对此特别感兴趣的是需要消耗一个生成器,以批量提交最多1000个更改到gmail API:

    messages = a_generator_which_would_not_be_smart_as_a_list
for idx, batch in groupby(messages, lambda x: x/1000):
batch_request = BatchHttpRequest()
for message in batch:
batch_request.add(self.service.users().messages().modify(userId='me', id=message['id'], body=msg_labels))
http = httplib2.Http()
self.credentials.authorize(http)
batch_request.execute(http=http)

NumPy很简单:

ints = array([1, 2, 3, 4, 5, 6, 7, 8])
for int1, int2 in ints.reshape(-1, 2):
print(int1, int2)

输出:

1 2
3 4
5 6
7 8
首先,我把它设计成将字符串分成子字符串来解析包含十六进制的字符串 今天我把它变成复杂的,但仍然简单的生成器
def chunker(iterable, size, reductor, condition):
it = iter(iterable)
def chunk_generator():
return (next(it) for _ in range(size))
chunk = reductor(chunk_generator())
while condition(chunk):
yield chunk
chunk = reductor(chunk_generator())

参数:

明显的

  • iterable是任何包含/生成/迭代输入数据的迭代器/迭代器/生成器,
  • size当然是你想要获取的chunk的大小,

更有趣的

  • reductor是一个可调用对象,它接收生成器迭代chunk的内容 我希望它返回序列或字符串,但我不要求这样 你可以传递这个参数,例如listtuplesetfrozenset
    或者任何更花哨的东西。我将传递这个函数,返回string
    (假设iterable包含/生成/迭代字符串):

    def concatenate(iterable):
    return ''.join(iterable)
    

    # EYZ1 < / p >

  • condition是一个可调用对象,它接收reductor返回的任何内容 决定批准&yield(通过返回任何值为True的值),
    或者拒绝它;

    .结束生成器的工作(通过返回其他内容或引发异常) 当iterable中的元素数不能被size整除时,当it耗尽时,reductor将收到生成器生成的元素少于size 我们将这些元素命名为持续的元素.

    我邀请了两个函数作为这个参数传递:

    • lambda x:x - 持续元素将被生成。

    • lambda x: len(x)==<size> - 持续的元素将被拒绝 用number = size

      替换<size>
    • 李< / ul > < / >

J.F. Sebastian 在这里给出的解决方案:

def chunker(iterable, chunksize):
return zip(*[iter(iterable)]*chunksize)
这很聪明,但有一个缺点-总是返回元组。如何获得字符串代替?< br > 当然你可以写''.join(chunker(...)),但是无论如何临时元组都会被构造

你可以通过编写自己的zip来摆脱临时元组,就像这样:

class IteratorExhausted(Exception):
pass


def translate_StopIteration(iterable, to=IteratorExhausted):
for i in iterable:
yield i
raise to # StopIteration would get ignored because this is generator,
# but custom exception can leave the generator.


def custom_zip(*iterables, reductor=tuple):
iterators = tuple(map(translate_StopIteration, iterables))
while True:
try:
yield reductor(next(i) for i in iterators)
except IteratorExhausted: # when any of iterators get exhausted.
break

然后

def chunker(data, size, reductor=tuple):
return custom_zip(*[iter(data)]*size, reductor=reductor)

使用示例:

>>> for i in chunker('12345', 2):
...     print(repr(i))
...
('1', '2')
('3', '4')
>>> for i in chunker('12345', 2, ''.join):
...     print(repr(i))
...
'12'
'34'

下面是一个支持生成器的无导入chunker:

def chunks(seq, size):
it = iter(seq)
while True:
ret = tuple(next(it) for _ in range(size))
if len(ret) == size:
yield ret
else:
raise StopIteration()

使用示例:

>>> def foo():
...     i = 0
...     while True:
...         i += 1
...         yield i
...
>>> c = chunks(foo(), 3)
>>> c.next()
(1, 2, 3)
>>> c.next()
(4, 5, 6)
>>> list(chunks('abcdefg', 2))
[('a', 'b'), ('c', 'd'), ('e', 'f')]

很容易让itertools.groupby为你工作,以获得一个iterables的iterable,而不需要创建任何临时列表:

groupby(iterable, (lambda x,y: (lambda z: x.next()/y))(count(),100))

不要被嵌套lambda吓跑,外部lambda只运行一次,将count()生成器和常量100放入内部lambda的作用域。

我用它来发送行块到mysql。

for k,v in groupby(bigdata, (lambda x,y: (lambda z: x.next()/y))(count(),100))):
cursor.executemany(sql, v)
def chunker(iterable, n):
"""Yield iterable in chunk sizes.


>>> chunks = chunker('ABCDEF', n=4)
>>> chunks.next()
['A', 'B', 'C', 'D']
>>> chunks.next()
['E', 'F']
"""
it = iter(iterable)
while True:
chunk = []
for i in range(n):
try:
chunk.append(next(it))
except StopIteration:
yield chunk
raise StopIteration
yield chunk


if __name__ == '__main__':
import doctest


doctest.testmod()

我喜欢这种方法。它感觉简单而不神奇,支持所有可迭代类型,并且不需要导入。

def chunk_iter(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(next(it) for _ in range(chunk_size))
if not chunk:
break
yield chunk

这里非常python化(您也可以内联split_groups函数体)

import itertools
def split_groups(iter_in, group_size):
return ((x for _, x in item) for _, item in itertools.groupby(enumerate(iter_in), key=lambda x: x[0] // group_size))


for x, y, z, w in split_groups(range(16), 4):
foo += x * y + z * w

这个答案拆分字符串列表, f.ex。达到符合pep8线长的要求:

def split(what, target_length=79):
'''splits list of strings into sublists, each
having string length at most 79'''
out = [[]]
while what:
if len("', '".join(out[-1])) + len(what[0]) < target_length:
out[-1].append(what.pop(0))
else:
if not out[-1]: # string longer than target_length
out[-1] = [what.pop(0)]
out.append([])
return out

使用

>>> split(['deferred_income', 'long_term_incentive', 'restricted_stock_deferred', 'shared_receipt_with_poi', 'loan_advances', 'from_messages', 'other', 'director_fees', 'bonus', 'total_stock_value', 'from_poi_to_this_person', 'from_this_person_to_poi', 'restricted_stock', 'salary', 'total_payments', 'exercised_stock_options'], 75)
[['deferred_income', 'long_term_incentive', 'restricted_stock_deferred'], ['shared_receipt_with_poi', 'loan_advances', 'from_messages', 'other'], ['director_fees', 'bonus', 'total_stock_value', 'from_poi_to_this_person'], ['from_this_person_to_poi', 'restricted_stock', 'salary', 'total_payments'], ['exercised_stock_options']]

我从来不想填充我的块,所以这个要求是必要的。我发现在任何可迭代对象上工作的能力也是必需的。鉴于此,我决定扩展公认的答案,https://stackoverflow.com/a/434411/1074659

如果由于需要比较和筛选填充值而不需要填充,则这种方法的性能会受到轻微的影响。然而,对于大块大小,这个实用程序是非常高性能的。

#!/usr/bin/env python3
from itertools import zip_longest




_UNDEFINED = object()




def chunker(iterable, chunksize, fillvalue=_UNDEFINED):
"""
Collect data into chunks and optionally pad it.


Performance worsens as `chunksize` approaches 1.


Inspired by:
https://docs.python.org/3/library/itertools.html#itertools-recipes


"""
args = [iter(iterable)] * chunksize
chunks = zip_longest(*args, fillvalue=fillvalue)
yield from (
filter(lambda val: val is not _UNDEFINED, chunk)
if chunk[-1] is _UNDEFINED
else chunk
for chunk in chunks
) if fillvalue is _UNDEFINED else chunks

如果你不介意使用外部包,你可以使用iteration_utilties 1 iteration_utilities.grouper。它支持所有可迭代对象(不仅仅是序列):

from iteration_utilities import grouper
seq = list(range(20))
for group in grouper(seq, 4):
print(group)

打印:

(0, 1, 2, 3)
(4, 5, 6, 7)
(8, 9, 10, 11)
(12, 13, 14, 15)
(16, 17, 18, 19)

如果长度不是组大小的倍数,它还支持填充(不完整的最后一组)或截断(丢弃不完整的最后一组)最后一个:

from iteration_utilities import grouper
seq = list(range(17))
for group in grouper(seq, 4):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16,)


for group in grouper(seq, 4, fillvalue=None):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16, None, None, None)


for group in grouper(seq, 4, truncate=True):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)

基准

我还决定比较上面提到的几种方法的运行时间。这是一个对数-对数图,根据不同大小的列表将“10”个元素分组。对于定性结果:较低意味着更快:

enter image description here

至少在这个基准测试中,iteration_utilities.grouper表现最好。接下来是Craz的方法。

基准测试是用simple_benchmark1创建的。运行这个基准测试的代码是:

import iteration_utilities
import itertools
from itertools import zip_longest


def consume_all(it):
return iteration_utilities.consume(it, None)


import simple_benchmark
b = simple_benchmark.BenchmarkBuilder()


@b.add_function()
def grouper(l, n):
return consume_all(iteration_utilities.grouper(l, n))


def Craz_inner(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)


@b.add_function()
def Craz(iterable, n, fillvalue=None):
return consume_all(Craz_inner(iterable, n, fillvalue))


def nosklo_inner(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))


@b.add_function()
def nosklo(seq, size):
return consume_all(nosklo_inner(seq, size))


def SLott_inner(ints, chunk_size):
for i in range(0, len(ints), chunk_size):
yield ints[i:i+chunk_size]


@b.add_function()
def SLott(ints, chunk_size):
return consume_all(SLott_inner(ints, chunk_size))


def MarkusJarderot1_inner(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it,size))


@b.add_function()
def MarkusJarderot1(iterable,size):
return consume_all(MarkusJarderot1_inner(iterable,size))


def MarkusJarderot2_inner(iterable,size,filler=None):
it = itertools.chain(iterable,itertools.repeat(filler,size-1))
chunk = tuple(itertools.islice(it,size))
while len(chunk) == size:
yield chunk
chunk = tuple(itertools.islice(it,size))


@b.add_function()
def MarkusJarderot2(iterable,size):
return consume_all(MarkusJarderot2_inner(iterable,size))


@b.add_arguments()
def argument_provider():
for exp in range(2, 20):
size = 2**exp
yield size, simple_benchmark.MultiArgument([[0] * size, 10])


r = b.run()

1声明:我是iteration_utilitiessimple_benchmark库的作者。

我希望通过将迭代器从列表中删除,我不是简单地复制列表的一部分。生成器可以被切片,它们将自动仍然是一个生成器,而列表将被切片成1000个条目的大块,这是较低的效率。

def iter_group(iterable, batch_size:int):
length = len(iterable)
start = batch_size*-1
end = 0
while(end < length):
start += batch_size
end += batch_size
if type(iterable) == list:
yield (iterable[i] for i in range(start,min(length-1,end)))
else:
yield iterable[start:end]

用法:

items = list(range(1,1251))


for item_group in iter_group(items, 1000):
for item in item_group:
print(item)

除非我遗漏了一些内容,否则没有提到以下使用生成器表达式的简单解决方案。它假设块的大小和数量都是已知的(通常是这样),并且不需要填充:

def chunks(it, n, m):
"""Make an iterator over m first chunks of size n.
"""
it = iter(it)
# Chunks are presented as tuples.
return (tuple(next(it) for _ in range(n)) for _ in range(m))

为什么不使用列表理解

l = [1 , 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
n = 4
filler = 0
fills = len(l) % n
chunks = ((l + [filler] * fills)[x * n:x * n + n] for x in range(int((len(l) + n - 1)/n)))
print(chunks)


[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 0]]

在Python 3.8中,您可以使用walrus操作符和itertools.islice

from itertools import islice


list_ = [i for i in range(10, 100)]


def chunker(it, size):
iterator = iter(it)
while chunk := list(islice(iterator, size)):
print(chunk)
In [2]: chunker(list_, 10)
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
[60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
[70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
[80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
[90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


more-itertools包有分块方法,它可以做到这一点:

import more_itertools
for s in more_itertools.chunked(range(9), 4):
print(s)

打印

[0, 1, 2, 3]
[4, 5, 6, 7]
[8]

chunked返回列表中的项。如果您更喜欢可迭代对象,请使用ichunked

下面是我的go works on lists,iter和range…懒洋洋地:

def chunker(it,size):
rv = []
for i,el in enumerate(it,1) :
rv.append(el)
if i % size == 0 :
yield rv
rv = []
if rv : yield rv

几乎变成了一句俏皮话;(

In [95]: list(chunker(range(9),2) )
Out[95]: [[0, 1], [2, 3], [4, 5], [6, 7], [8]]


In [96]: list(chunker([1,2,3,4,5],2) )
Out[96]: [[1, 2], [3, 4], [5]]


In [97]: list(chunker(iter(range(9)),2) )
Out[97]: [[0, 1], [2, 3], [4, 5], [6, 7], [8]]


In [98]: list(chunker(range(9),25) )
Out[98]: [[0, 1, 2, 3, 4, 5, 6, 7, 8]]


In [99]: list(chunker(range(9),1) )
Out[99]: [[0], [1], [2], [3], [4], [5], [6], [7], [8]]


In [101]: %timeit list(chunker(range(101),2) )
11.3 µs ± 68.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

在我的特殊情况下,我需要填充项目,以重复最后一个元素,直到它达到大小,所以我改变了答案,以满足我的需要。

大小为4的输入输出示例:

Input = [1,2,3,4,5,6,7,8]
Output= [[1,2,3,4], [5,6,7,8]]


Input = [[1,2,3,4,5,6,7]]
Output= [[1,2,3,4], [5,6,7,7]]


Input = [1,2,3,4,5]
Output= [[1,2,3,4], [5,5,5,5]]
def chunker(seq, size):
res = []
for el in seq:
res.append(el)
if len(res) == size:
yield res
res = []
if res:
res = res + (size - len(res)) * [res[-1]]
yield res