读取二进制文件并遍历每个字节

在Python中,我如何在二进制文件中读取并循环该文件的每个字节?

1070243 次浏览

Python >= 3.8

多亏了海象运算符(:=),解决方案很短。我们从文件中读取bytes对象,并将它们赋值给变量byte

with open("myfile", "rb") as f:
while (byte := f.read(1)):
# Do stuff with byte.

Python >= 3

在较旧的Python 3版本中,我们必须使用更详细的方式:

with open("myfile", "rb") as f:
byte = f.read(1)
while byte != b"":
# Do stuff with byte.
byte = f.read(1)

或者如benhoyt所说,跳过不相等,并利用b""的结果为false这一事实。这使得代码在2.6和3之间兼容。X没有任何变化。如果从字节模式转换为文本模式或相反模式,也可以避免更改条件。

with open("myfile", "rb") as f:
byte = f.read(1)
while byte:
# Do stuff with byte.
byte = f.read(1)

Python >= 2.5

在Python 2中,这有点不同。这里我们得到的不是bytes对象,而是原始字符:

with open("myfile", "rb") as f:
byte = f.read(1)
while byte != "":
# Do stuff with byte.
byte = f.read(1)

注意,with语句在Python 2.5以下版本中不可用。要在v 2.5中使用它,你需要导入它:

from __future__ import with_statement

在2.6中不需要这样做。

Python 2.4及更早版本

f = open("myfile", "rb")
try:
byte = f.read(1)
while byte != "":
# Do stuff with byte.
byte = f.read(1)
finally:
f.close()

如果文件不是太大,在内存中保存它是一个问题:

with open("filename", "rb") as f:
bytes_read = f.read()
for b in bytes_read:
process_byte(b)

其中process_byte表示希望对传入的字节执行的一些操作。

如果你想一次处理一个数据块:

with open("filename", "rb") as f:
bytes_read = f.read(CHUNKSIZE)
while bytes_read:
for b in bytes_read:
process_byte(b)
bytes_read = f.read(CHUNKSIZE)

with语句在Python 2.5及更高版本中可用。

这个生成器从文件中产生字节,以块的形式读取文件:

def bytes_from_file(filename, chunksize=8192):
with open(filename, "rb") as f:
while True:
chunk = f.read(chunksize)
if chunk:
for b in chunk:
yield b
else:
break


# example:
for b in bytes_from_file('filename'):
do_stuff_with(b)

有关迭代器发电机的信息,请参阅Python文档。

总结chrispy, Skurmedel, Ben Hoyt和Peter Hansen的所有出色之处,这将是一次一个字节处理二进制文件的最佳解决方案:

with open("myfile", "rb") as f:
while True:
byte = f.read(1)
if not byte:
break
do_stuff_with(ord(byte))

对于python 2.6及以上版本,因为:

  • Python内部缓冲区-不需要读取块
  • DRY原则——不重复读行
  • 语句确保干净的文件关闭
  • 当没有更多字节时,'byte'的计算结果为false(当字节为零时不是)

或使用J. F.塞巴斯蒂安的解决方案提高速度

from functools import partial


with open(filename, 'rb') as file:
for byte in iter(partial(file.read, 1), b''):
# Do stuff with byte

或者如果你想把它作为一个生成器函数,就像codeape演示的那样:

def bytes_from_file(filename):
with open(filename, "rb") as f:
while True:
byte = f.read(1)
if not byte:
break
yield(ord(byte))


# example:
for b in bytes_from_file('filename'):
do_stuff_with(b)

要读取一个文件-一次一个字节(忽略缓冲)-你可以使用双参数iter(callable, sentinel)内置函数:

with open(filename, 'rb') as file:
for byte in iter(lambda: file.read(1), b''):
# Do stuff with byte

它调用file.read(1),直到它没有返回任何b''(空字节串)。对于大文件,内存不会无限增长。你可以将buffering=0传递给open()来禁用缓冲——它保证每次迭代只读取一个字节(慢)。

with-statement自动关闭文件-包括下面的代码引发异常的情况。

尽管默认情况下存在内部缓冲,但一次处理一个字节的效率仍然很低。例如,下面是blackhole.py实用程序,它会吃掉给定的所有内容:

#!/usr/bin/env python3
"""Discard all input. `cat > /dev/null` analog."""
import sys
from functools import partial
from collections import deque


chunksize = int(sys.argv[1]) if len(sys.argv) > 1 else (1 << 15)
deque(iter(partial(sys.stdin.detach().read, chunksize), b''), maxlen=0)

例子:

$ dd if=/dev/zero bs=1M count=1000 | python3 blackhole.py

它在我的机器上处理chunksize == 32768时处理~ 1.5 GB / s,在chunksize == 1时只处理~ 7.5 MB / s。也就是说,每次读取一个字节要慢200倍。考虑一下,如果你可以重写你的处理,一次使用多个字节和如果,你需要性能。

mmap允许你同时将一个文件作为bytearray和一个文件对象。如果需要访问两个接口,它可以作为在内存中加载整个文件的替代方案。特别地,你可以只使用普通的for-loop对内存映射文件一次迭代一个字节:

from mmap import ACCESS_READ, mmap


with open(filename, 'rb', 0) as f, mmap(f.fileno(), 0, access=ACCESS_READ) as s:
for byte in s: # length is equal to the current file size
# Do stuff with byte

mmap支持片表示法。例如,mm[i:i+len]返回文件中从i位置开始的len字节。Python 3.2之前不支持上下文管理器协议;在这种情况下,你需要显式调用mm.close()。使用mmap遍历每个字节比file.read(1)消耗更多的内存,但mmap快一个数量级。

如果你有很多二进制数据要读取,你可能想要考虑结构模块。它被记录为“在C和Python类型之间”转换,但当然,字节就是字节,它们是否被创建为C类型并不重要。例如,如果你的二进制数据包含两个2字节整数和一个4字节整数,你可以这样读取它们(例子来自struct文档):

>>> struct.unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')
(1, 2, 3)

您可能会发现这比显式遍历文件内容更方便、更快,或者两者兼而有之。

如果你正在寻找一些快速的方法,这里有一个我一直在使用的方法,它已经工作了很多年:

from array import array


with open( path, 'rb' ) as file:
data = array( 'B', file.read() ) # buffer the file


# evaluate it's data
for byte in data:
v = byte # int value
c = chr(byte)

如果你想迭代字符而不是整数,你可以简单地使用data = file.read(),它应该是py3中的bytes()对象。

在Python中读取二进制文件并遍历每个字节

Python 3.5的新功能是pathlib模块,它有一个方便的方法,专门以字节的形式读入文件,允许我们遍历字节。我认为这是一个体面的答案(如果快速和肮脏):

import pathlib


for byte in pathlib.Path(path).read_bytes():
print(byte)

有趣的是,这是唯一的答案提到pathlib

在Python 2中,你可能会这样做(正如Vinay Sajip也建议的那样):

with open(path, 'b') as file:
for byte in file.read():
print(byte)

在文件可能太大而无法在内存中迭代的情况下,您将使用带有callable, sentinel签名的iter函数(Python 2版本)将其分成块:

with open(path, 'b') as file:
callable = lambda: file.read(1024)
sentinel = bytes() # or b''
for chunk in iter(callable, sentinel):
for byte in chunk:
print(byte)

(其他几个答案也提到了这一点,但很少提供合理的读取大小。)

用于大文件或缓冲/交互式读取的最佳实践

让我们创建一个函数来实现这一点,包括Python 3.5+标准库的惯用用法:

from pathlib import Path
from functools import partial
from io import DEFAULT_BUFFER_SIZE


def file_byte_iterator(path):
"""given a path, return an iterator over the file
that lazily loads the file
"""
path = Path(path)
with path.open('rb') as file:
reader = partial(file.read1, DEFAULT_BUFFER_SIZE)
file_iterator = iter(reader, bytes())
for chunk in file_iterator:
yield from chunk

注意,我们使用file.read1file.read阻塞,直到得到它或EOF请求的所有字节。file.read1允许我们避免阻塞,因此它可以更快地返回。其他答案也没有提到这一点。

最佳实践使用的演示:

让我们创建一个带有兆字节(实际上是mebibyte)伪随机数据的文件:

import random
import pathlib
path = 'pseudorandom_bytes'
pathobj = pathlib.Path(path)


pathobj.write_bytes(
bytes(random.randint(0, 255) for _ in range(2**20)))

现在让我们遍历它并在内存中物化它:

>>> l = list(file_byte_iterator(path))
>>> len(l)
1048576

我们可以检查数据的任何部分,例如,最后100字节和前100字节:

>>> l[-100:]
[208, 5, 156, 186, 58, 107, 24, 12, 75, 15, 1, 252, 216, 183, 235, 6, 136, 50, 222, 218, 7, 65, 234, 129, 240, 195, 165, 215, 245, 201, 222, 95, 87, 71, 232, 235, 36, 224, 190, 185, 12, 40, 131, 54, 79, 93, 210, 6, 154, 184, 82, 222, 80, 141, 117, 110, 254, 82, 29, 166, 91, 42, 232, 72, 231, 235, 33, 180, 238, 29, 61, 250, 38, 86, 120, 38, 49, 141, 17, 190, 191, 107, 95, 223, 222, 162, 116, 153, 232, 85, 100, 97, 41, 61, 219, 233, 237, 55, 246, 181]
>>> l[:100]
[28, 172, 79, 126, 36, 99, 103, 191, 146, 225, 24, 48, 113, 187, 48, 185, 31, 142, 216, 187, 27, 146, 215, 61, 111, 218, 171, 4, 160, 250, 110, 51, 128, 106, 3, 10, 116, 123, 128, 31, 73, 152, 58, 49, 184, 223, 17, 176, 166, 195, 6, 35, 206, 206, 39, 231, 89, 249, 21, 112, 168, 4, 88, 169, 215, 132, 255, 168, 129, 127, 60, 252, 244, 160, 80, 155, 246, 147, 234, 227, 157, 137, 101, 84, 115, 103, 77, 44, 84, 134, 140, 77, 224, 176, 242, 254, 171, 115, 193, 29]

对于二进制文件,不要逐行迭代

不要做下面的操作——这将拖动任意大小的块,直到它变成一个换行符——当块太小时速度太慢,而且可能也太大了:

    with open(path, 'rb') as file:
for chunk in file: # text newline iteration - not for bytes
yield from chunk

以上只适用于语义上人类可读的文本文件(如纯文本、代码、标记、markdown等)。基本上任何ascii, utf,拉丁语等…编码),你应该打开没有'b'标志。

Python 3,一次读取所有文件:

with open("filename", "rb") as binary_file:
# Read the whole file at once
data = binary_file.read()
print(data)

你可以使用data变量迭代你想要的任何东西。

在尝试了以上所有方法并使用@Aaron Hall的答案后,我在一台运行windows 10, 8gb RAM和Python 3.5 32位的计算机上得到了一个~ 90mb的文件的内存错误。我被一位同事推荐使用numpy来代替,它的效果很好。

到目前为止,读取整个二进制文件(我测试过)的最快速度是:

import numpy as np


file = "binary_file.bin"
data = np.fromfile(file, 'u1')

Reference .

比目前任何方法都要快。希望它能帮助到一些人!

这篇文章本身并不是对这个问题的直接回答。相反,它是一个数据驱动的可扩展基准测试,可以用来比较这个问题的许多答案(以及利用后来更现代的Python版本中添加的新特性的变体),因此应该有助于确定哪个具有最佳性能。

在一些情况下,我修改了参考答案中的代码,使其与基准测试框架兼容。

首先,下面是Python 2最新版本的结果;3:

Fastest to slowest execution speeds with 32-bit Python 2.7.16
numpy version 1.16.5
Test file size: 1,024 KiB
100 executions, best of 3 repetitions


1                  Tcll (array.array) :   3.8943 secs, rel speed   1.00x,   0.00% slower (262.95 KiB/sec)
2  Vinay Sajip (read all into memory) :   4.1164 secs, rel speed   1.06x,   5.71% slower (248.76 KiB/sec)
3            codeape + iter + partial :   4.1616 secs, rel speed   1.07x,   6.87% slower (246.06 KiB/sec)
4                             codeape :   4.1889 secs, rel speed   1.08x,   7.57% slower (244.46 KiB/sec)
5               Vinay Sajip (chunked) :   4.1977 secs, rel speed   1.08x,   7.79% slower (243.94 KiB/sec)
6           Aaron Hall (Py 2 version) :   4.2417 secs, rel speed   1.09x,   8.92% slower (241.41 KiB/sec)
7                     gerrit (struct) :   4.2561 secs, rel speed   1.09x,   9.29% slower (240.59 KiB/sec)
8                     Rick M. (numpy) :   8.1398 secs, rel speed   2.09x, 109.02% slower (125.80 KiB/sec)
9                           Skurmedel :  31.3264 secs, rel speed   8.04x, 704.42% slower ( 32.69 KiB/sec)


Benchmark runtime (min:sec) - 03:26

Fastest to slowest execution speeds with 32-bit Python 3.8.0
numpy version 1.17.4
Test file size: 1,024 KiB
100 executions, best of 3 repetitions


1  Vinay Sajip + "yield from" + "walrus operator" :   3.5235 secs, rel speed   1.00x,   0.00% slower (290.62 KiB/sec)
2                       Aaron Hall + "yield from" :   3.5284 secs, rel speed   1.00x,   0.14% slower (290.22 KiB/sec)
3         codeape + iter + partial + "yield from" :   3.5303 secs, rel speed   1.00x,   0.19% slower (290.06 KiB/sec)
4                      Vinay Sajip + "yield from" :   3.5312 secs, rel speed   1.00x,   0.22% slower (289.99 KiB/sec)
5      codeape + "yield from" + "walrus operator" :   3.5370 secs, rel speed   1.00x,   0.38% slower (289.51 KiB/sec)
6                          codeape + "yield from" :   3.5390 secs, rel speed   1.00x,   0.44% slower (289.35 KiB/sec)
7                                      jfs (mmap) :   4.0612 secs, rel speed   1.15x,  15.26% slower (252.14 KiB/sec)
8              Vinay Sajip (read all into memory) :   4.5948 secs, rel speed   1.30x,  30.40% slower (222.86 KiB/sec)
9                        codeape + iter + partial :   4.5994 secs, rel speed   1.31x,  30.54% slower (222.64 KiB/sec)
10                                        codeape :   4.5995 secs, rel speed   1.31x,  30.54% slower (222.63 KiB/sec)
11                          Vinay Sajip (chunked) :   4.6110 secs, rel speed   1.31x,  30.87% slower (222.08 KiB/sec)
12                      Aaron Hall (Py 2 version) :   4.6292 secs, rel speed   1.31x,  31.38% slower (221.20 KiB/sec)
13                             Tcll (array.array) :   4.8627 secs, rel speed   1.38x,  38.01% slower (210.58 KiB/sec)
14                                gerrit (struct) :   5.0816 secs, rel speed   1.44x,  44.22% slower (201.51 KiB/sec)
15                 Rick M. (numpy) + "yield from" :  11.8084 secs, rel speed   3.35x, 235.13% slower ( 86.72 KiB/sec)
16                                      Skurmedel :  11.8806 secs, rel speed   3.37x, 237.18% slower ( 86.19 KiB/sec)
17                                Rick M. (numpy) :  13.3860 secs, rel speed   3.80x, 279.91% slower ( 76.50 KiB/sec)


Benchmark runtime (min:sec) - 04:47

我还用一个更大的10mib测试文件运行它(运行了将近一个小时),得到的性能结果与上面所示的相当。

下面是用来做基准测试的代码:

from __future__ import print_function
import array
import atexit
from collections import deque, namedtuple
import io
from mmap import ACCESS_READ, mmap
import numpy as np
from operator import attrgetter
import os
import random
import struct
import sys
import tempfile
from textwrap import dedent
import time
import timeit
import traceback


try:
xrange
except NameError:  # Python 3
xrange = range




class KiB(int):
""" KibiBytes - multiples of the byte units for quantities of information. """
def __new__(self, value=0):
return 1024*value




BIG_TEST_FILE = 1  # MiBs or 0 for a small file.
SML_TEST_FILE = KiB(64)
EXECUTIONS = 100  # Number of times each "algorithm" is executed per timing run.
TIMINGS = 3  # Number of timing runs.
CHUNK_SIZE = KiB(8)
if BIG_TEST_FILE:
FILE_SIZE = KiB(1024) * BIG_TEST_FILE
else:
FILE_SIZE = SML_TEST_FILE  # For quicker testing.


# Common setup for all algorithms -- prefixed to each algorithm's setup.
COMMON_SETUP = dedent("""
# Make accessible in algorithms.
from __main__ import array, deque, get_buffer_size, mmap, np, struct
from __main__ import ACCESS_READ, CHUNK_SIZE, FILE_SIZE, TEMP_FILENAME
from functools import partial
try:
xrange
except NameError:  # Python 3
xrange = range
""")




def get_buffer_size(path):
""" Determine optimal buffer size for reading files. """
st = os.stat(path)
try:
bufsize = st.st_blksize # Available on some Unix systems (like Linux)
except AttributeError:
bufsize = io.DEFAULT_BUFFER_SIZE
return bufsize


# Utility primarily for use when embedding additional algorithms into benchmark.
VERIFY_NUM_READ = """
# Verify generator reads correct number of bytes (assumes values are correct).
bytes_read = sum(1 for _ in file_byte_iterator(TEMP_FILENAME))
assert bytes_read == FILE_SIZE, \
'Wrong number of bytes generated: got {:,} instead of {:,}'.format(
bytes_read, FILE_SIZE)
"""


TIMING = namedtuple('TIMING', 'label, exec_time')


class Algorithm(namedtuple('CodeFragments', 'setup, test')):


# Default timeit "stmt" code fragment.
_TEST = """
#for b in file_byte_iterator(TEMP_FILENAME):  # Loop over every byte.
#    pass  # Do stuff with byte...
deque(file_byte_iterator(TEMP_FILENAME), maxlen=0)  # Data sink.
"""


# Must overload __new__ because (named)tuples are immutable.
def __new__(cls, setup, test=None):
""" Dedent (unindent) code fragment string arguments.
Args:
`setup` -- Code fragment that defines things used by `test` code.
In this case it should define a generator function named
`file_byte_iterator()` that will be passed that name of a test file
of binary data. This code is not timed.
`test` -- Code fragment that uses things defined in `setup` code.
Defaults to _TEST. This is the code that's timed.
"""
test =  cls._TEST if test is None else test  # Use default unless one is provided.


# Uncomment to replace all performance tests with one that verifies the correct
# number of bytes values are being generated by the file_byte_iterator function.
#test = VERIFY_NUM_READ


return tuple.__new__(cls, (dedent(setup), dedent(test)))




algorithms = {


'Aaron Hall (Py 2 version)': Algorithm("""
def file_byte_iterator(path):
with open(path, "rb") as file:
callable = partial(file.read, 1024)
sentinel = bytes() # or b''
for chunk in iter(callable, sentinel):
for byte in chunk:
yield byte
"""),


"codeape": Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
while True:
chunk = f.read(chunksize)
if chunk:
for b in chunk:
yield b
else:
break
"""),


"codeape + iter + partial": Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
for chunk in iter(partial(f.read, chunksize), b''):
for b in chunk:
yield b
"""),


"gerrit (struct)": Algorithm("""
def file_byte_iterator(filename):
with open(filename, "rb") as f:
fmt = '{}B'.format(FILE_SIZE)  # Reads entire file at once.
for b in struct.unpack(fmt, f.read()):
yield b
"""),


'Rick M. (numpy)': Algorithm("""
def file_byte_iterator(filename):
for byte in np.fromfile(filename, 'u1'):
yield byte
"""),


"Skurmedel": Algorithm("""
def file_byte_iterator(filename):
with open(filename, "rb") as f:
byte = f.read(1)
while byte:
yield byte
byte = f.read(1)
"""),


"Tcll (array.array)": Algorithm("""
def file_byte_iterator(filename):
with open(filename, "rb") as f:
arr = array.array('B')
arr.fromfile(f, FILE_SIZE)  # Reads entire file at once.
for b in arr:
yield b
"""),


"Vinay Sajip (read all into memory)": Algorithm("""
def file_byte_iterator(filename):
with open(filename, "rb") as f:
bytes_read = f.read()  # Reads entire file at once.
for b in bytes_read:
yield b
"""),


"Vinay Sajip (chunked)": Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
chunk = f.read(chunksize)
while chunk:
for b in chunk:
yield b
chunk = f.read(chunksize)
"""),


}  # End algorithms


#
# Versions of algorithms that will only work in certain releases (or better) of Python.
#
if sys.version_info >= (3, 3):
algorithms.update({


'codeape + iter + partial + "yield from"': Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
for chunk in iter(partial(f.read, chunksize), b''):
yield from chunk
"""),


'codeape + "yield from"': Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
while True:
chunk = f.read(chunksize)
if chunk:
yield from chunk
else:
break
"""),


"jfs (mmap)": Algorithm("""
def file_byte_iterator(filename):
with open(filename, "rb") as f, \
mmap(f.fileno(), 0, access=ACCESS_READ) as s:
yield from s
"""),


'Rick M. (numpy) + "yield from"': Algorithm("""
def file_byte_iterator(filename):
#    data = np.fromfile(filename, 'u1')
yield from np.fromfile(filename, 'u1')
"""),


'Vinay Sajip + "yield from"': Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
chunk = f.read(chunksize)
while chunk:
yield from chunk  # Added in Py 3.3
chunk = f.read(chunksize)
"""),


})  # End Python 3.3 update.


if sys.version_info >= (3, 5):
algorithms.update({


'Aaron Hall + "yield from"': Algorithm("""
from pathlib import Path


def file_byte_iterator(path):
''' Given a path, return an iterator over the file
that lazily loads the file.
'''
path = Path(path)
bufsize = get_buffer_size(path)


with path.open('rb') as file:
reader = partial(file.read1, bufsize)
for chunk in iter(reader, bytes()):
yield from chunk
"""),


})  # End Python 3.5 update.


if sys.version_info >= (3, 8, 0):
algorithms.update({


'Vinay Sajip + "yield from" + "walrus operator"': Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
while chunk := f.read(chunksize):
yield from chunk  # Added in Py 3.3
"""),


'codeape + "yield from" + "walrus operator"': Algorithm("""
def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
with open(filename, "rb") as f:
while chunk := f.read(chunksize):
yield from chunk
"""),


})  # End Python 3.8.0 update.update.




#### Main ####


def main():
global TEMP_FILENAME


def cleanup():
""" Clean up after testing is completed. """
try:
os.remove(TEMP_FILENAME)  # Delete the temporary file.
except Exception:
pass


atexit.register(cleanup)


# Create a named temporary binary file of pseudo-random bytes for testing.
fd, TEMP_FILENAME = tempfile.mkstemp('.bin')
with os.fdopen(fd, 'wb') as file:
os.write(fd, bytearray(random.randrange(256) for _ in range(FILE_SIZE)))


# Execute and time each algorithm, gather results.
start_time = time.time()  # To determine how long testing itself takes.


timings = []
for label in algorithms:
try:
timing = TIMING(label,
min(timeit.repeat(algorithms[label].test,
setup=COMMON_SETUP + algorithms[label].setup,
repeat=TIMINGS, number=EXECUTIONS)))
except Exception as exc:
print('{} occurred timing the algorithm: "{}"\n  {}'.format(
type(exc).__name__, label, exc))
traceback.print_exc(file=sys.stdout)  # Redirect to stdout.
sys.exit(1)
timings.append(timing)


# Report results.
print('Fastest to slowest execution speeds with {}-bit Python {}.{}.{}'.format(
64 if sys.maxsize > 2**32 else 32, *sys.version_info[:3]))
print('  numpy version {}'.format(np.version.full_version))
print('  Test file size: {:,} KiB'.format(FILE_SIZE // KiB(1)))
print('  {:,d} executions, best of {:d} repetitions'.format(EXECUTIONS, TIMINGS))
print()


longest = max(len(timing.label) for timing in timings)  # Len of longest identifier.
ranked = sorted(timings, key=attrgetter('exec_time')) # Sort so fastest is first.
fastest = ranked[0].exec_time
for rank, timing in enumerate(ranked, 1):
print('{:<2d} {:>{width}} : {:8.4f} secs, rel speed {:6.2f}x, {:6.2f}% slower '
'({:6.2f} KiB/sec)'.format(
rank,
timing.label, timing.exec_time, round(timing.exec_time/fastest, 2),
round((timing.exec_time/fastest - 1) * 100, 2),
(FILE_SIZE/timing.exec_time) / KiB(1),  # per sec.
width=longest))
print()
mins, secs = divmod(time.time()-start_time, 60)
print('Benchmark runtime (min:sec) - {:02d}:{:02d}'.format(int(mins),
int(round(secs))))


main()

下面是一个使用Numpy fromfile读取网络端数据的例子:

dtheader= np.dtype([('Start Name','b', (4,)),
('Message Type', np.int32, (1,)),
('Instance', np.int32, (1,)),
('NumItems', np.int32, (1,)),
('Length', np.int32, (1,)),
('ComplexArray', np.int32, (1,))])
dtheader=dtheader.newbyteorder('>')


headerinfo = np.fromfile(iqfile, dtype=dtheader, count=1)


print(raw['Start Name'])

我希望这能有所帮助。问题是fromfile不能识别和EOF,并允许对任意大小的文件优雅地跳出循环。

对于大尺寸,我认为使用生成器不会不好,这个答案是为了读取像文件这样的东西,虽然@codeapp有一个类似的答案,我认为删除内部循环会更有意义。

def read_chunk(file_object, chunk_size=125):
while True:
file =  file_object.read(chunk_size)
if not file:
break
yield file




#sample use
buffer = io.BytesIO()
file = open('myfile', 'r')
for chunk in read_chunk(file):
buffer.write(chunk)
buffer.seek(0)
// save the file or do whatever you want here

你仍然可以使用它作为一个正常的列表,我不认为这是任何用途,但是

file_list = list(read_chunk(file, chunk_size=10000))
for i in file_list:
# do something

然后得到每个数据块的索引

for index, chunk in enumurate(read_chunk(file, chunk_size=10000)):
#use the index as a number index
# you can try and get the size of each chunk with this
length = len(chunk)

提醒你,注意文件的大小,chunk_size总是以字节为单位