如何逐行读取一个大文件?

我希望遍历整个文件的每一行。一种方法是读取整个文件,将其保存到一个列表中,然后遍历感兴趣的行。这种方法使用大量内存,所以我正在寻找一种替代方法。

到目前为止我的代码:

for each_line in fileinput.input(input_file):
do_something(each_line)


for each_line_again in fileinput.input(input_file):
do_something(each_line_again)

执行这段代码会给出一个错误消息:device active

有什么建议吗?

目的是计算成对字符串的相似性,这意味着对于文件中的每一行,我想计算与其他每一行的Levenshtein距离。

编辑:在这个问题8个月后提出的一个相关问题有许多有用的答案和评论。要更深入地了解python逻辑,请阅读相关问题如何在Python中逐行读取文件?

852232 次浏览

两种内存高效方法按顺序排列(第一种是最好的)-

  1. 使用with - python 2.5及以上版本支持
  2. 如果你真的想控制读取的量,请使用yield

1. 使用with

with是读取大文件的一种漂亮而高效的python方法。1)文件对象在退出with执行块后自动关闭。2)在with块内部处理异常。3)内存for循环逐行遍历f文件对象。在内部它做缓冲IO(优化昂贵的IO操作)和内存管理。

with open("x.txt") as f:
for line in f:
do something with data

2. 使用yield

有时,人们可能希望对每次迭代中读取的量进行更细粒度的控制。在这种情况下,使用iter &# EYZ1。注意,使用这种方法需要在结束时显式地关闭文件。

def readInChunks(fileObj, chunkSize=2048):
"""
Lazy function to read a file piece by piece.
Default chunk size: 2kB.


"""
while True:
data = fileObj.read(chunkSize)
if not data:
break
yield data


f = open('bigFile')
for chunk in readInChunks(f):
do_something(chunk)
f.close()

陷阱和为了完整起见 -下面的方法对于读取大文件来说不太好或不太优雅,但请阅读以获得全面的理解。

在Python中,从文件中读取行最常见的方法是执行以下操作:

for line in open('myfile','r').readlines():
do_something(line)

但是,当这完成时,readlines()函数(同样适用于read()函数)将整个文件加载到内存中,然后遍历它。对于大文件,稍微好一点的方法(上面提到的两种方法是最好的)是使用fileinput模块,如下所示:

import fileinput


for line in fileinput.input(['myfile']):
do_something(line)

fileinput.input()调用按顺序读取行,但在读取后不将它们保存在内存中,甚至只是这样,因为python中的file是可迭代的。

参考文献

  1. Python with statement .

来自fileinput.input()的python文档:

这将遍历sys.argv[1:]中列出的所有文件的行,如果列表为空,则默认为sys.stdin

进一步,函数的定义为:

fileinput.FileInput([files[, inplace[, backup[, mode[, openhook]]]]])

字里行间,这告诉我files可以是一个列表,所以你可以有这样的东西:

for each_line in fileinput.input([input_file, input_file]):
do_something(each_line)

有关更多信息,请参阅在这里

这是python中读取文件的一种可能方式:

f = open(input_file)
for line in f:
do_stuff(line)
f.close()

它不分配一个完整的列表。它在直线上迭代。

正确的、完全python式的读取文件的方法如下:

with open(...) as f:
for line in f:
# Do something with 'line'

with语句处理打开和关闭文件,包括在内部块中引发异常的情况。for line in f将文件对象f视为可迭代对象,它自动使用缓冲I/O和内存管理,因此您不必担心大文件。

应该有一种——最好只有一种——明显的方法来做到这一点。

Katrielalex提供了打开&读取一个文件。

不管你的算法是如何运行的,它会为文件的每一行读取整个文件。这意味着,如果N是文件中的行数,那么读取文件和计算Levenshtein距离的总量将是N*N。由于您关心文件大小,并且不想将其保存在内存中,因此我关心的是结果二次运行时。你的算法属于O(n^2)类算法,通常可以通过专门化来改进。

我怀疑您已经知道这里内存与运行时的权衡,但是您可能想要研究是否有一种并行计算多个Levenshtein距离的有效方法。如果是这样的话,在这里分享你的解决方案会很有趣。

你的文件有多少行,在什么样的机器上(mem &CPU功率)你的算法必须运行,容忍运行时间是多少?

代码如下所示:

with f_outer as open(input_file, 'r'):
for line_outer in f_outer:
with f_inner as open(input_file, 'r'):
for line_inner in f_inner:
compute_distance(line_outer, line_inner)

但问题是你如何存储距离(矩阵?),你能获得一个优势准备例如outer_line处理,或缓存一些中间结果以供重用。

我强烈建议不要使用默认的文件加载,因为它非常慢。你应该研究一下numpy函数和IOpro函数(例如numpy.loadtxt())。

http://docs.scipy.org/doc/numpy/user/basics.io.genfromtxt.html

< a href = " https://store.continuum。io / cshop / iopro / " rel = " https://store.continuum.io/cshop/iopro/ nofollow”> < / >

然后你可以把你的成对操作分解成几个块:

import numpy as np
import math


lines_total = n
similarity = np.zeros(n,n)
lines_per_chunk = m
n_chunks = math.ceil(float(n)/m)
for i in xrange(n_chunks):
for j in xrange(n_chunks):
chunk_i = (function of your choice to read lines i*lines_per_chunk to (i+1)*lines_per_chunk)
chunk_j = (function of your choice to read lines j*lines_per_chunk to (j+1)*lines_per_chunk)
similarity[i*lines_per_chunk:(i+1)*lines_per_chunk,
j*lines_per_chunk:(j+1)*lines_per_chunk] = fast_operation(chunk_i, chunk_j)

以块方式加载数据,然后对其进行矩阵操作,几乎总是比一个元素一个元素地加载数据快得多!!

去除换行符:

with open(file_path, 'rU') as f:
for line_terminated in f:
line = line_terminated.rstrip('\n')
...

使用通用换行支持,所有文本文件行似乎都以'\n'结束,无论文件中的终止符是'\r''\n'还是'\r\n'

指定通用换行符支持:

  • Unix上的Python 2 - open(file_path, mode='rU') -需要[谢谢@Dave]
  • Windows上的Python 2 - open(file_path, mode='rU') -可选
  • Python 3 - open(file_path, newline=None) -可选

newline参数仅在Python 3中支持,默认为Nonemode参数在所有情况下默认为'r'U在Python 3中已弃用。在Windows上的python2中,一些其他机制似乎将\r\n转换为\n

文档:

保存本机行终止符:

with open(file_path, 'rb') as f:
with line_native_terminated in f:
...

二进制模式仍然可以用in将文件解析成行。每一行都有它在文件中的终止符。

感谢@katrielalex's answer, Python's open() doc,和iPython实验。

#Using a text file for the example
with open("yourFile.txt","r") as f:
text = f.readlines()
for line in text:
print line
  • 打开文件以读取(r)
  • 读取整个文件,并将每行保存为列表(文本)
  • 遍历列表打印每行。

例如,如果您希望检查长度大于10的特定行,则使用现有的可用内容。

for line in text:
if len(line) > 10:
print line

一些关于我来自哪里的背景信息。代码片段在最后。

当我可以的时候,我更喜欢使用像H2O这样的开源工具来进行超高性能的并行CSV文件读取,但这个工具的功能集有限。最后,我写了很多代码来创建数据科学管道,然后才将其提供给H2O集群进行监督学习。

我一直在从UCI repo读取8GB的HIGGS数据集,甚至40GB的CSV文件,用于数据科学目的,通过使用多处理库的池对象和映射函数添加大量并行性,从而显著加快了读取速度。例如,使用最近邻搜索的聚类以及DBSCAN和Markov聚类算法需要一些并行编程技巧来绕过一些严重的内存和挂钟时间问题。

我通常喜欢先使用gnu工具将文件按行分解成各个部分,然后在python程序中并行地查找和读取它们。我通常使用1000多个部分文件。使用这些技巧对处理速度和内存限制有极大的帮助。

熊猫数据框架。Read_csv是单线程的,所以你可以通过运行map()并行执行来使pandas更快。您可以使用htop来查看普通的顺序熊猫数据框架。Read_csv,一个核上100%的CPU是pd的实际瓶颈。Read_csv,而不是磁盘。

我应该补充一句,我使用的是快速显卡总线上的SSD,而不是SATA6总线上的旋转HD,外加16个CPU内核。

另外,我发现在一些应用程序中非常有效的另一种技术是在一个大文件中并行读取CSV文件,以不同的偏移量开始每个worker到文件中,而不是预先将一个大文件分割成许多部分文件。在每个并行工作线程中使用python的文件seek()和tell()在大文件中不同的字节偏移的开始字节和结束字节位置读取大文本文件,所有这些都是同时并发的。您可以在字节上执行regex findall,并返回换行的计数。这是部分和。最后,当工人完成工作后map函数返回时,对部分和求和得到全局和。

下面是一些使用并行字节偏移技巧的示例基准测试:

我使用2个文件:HIGGS.csv是8gb。它来自UCI机器学习库。all_bin .csv是40.4 GB,来自我当前的项目。 我使用两个程序:Linux附带的GNU wc程序,以及我开发的纯python fastread.py程序
HP-Z820:/mnt/fastssd/fast_file_reader$ ls -l /mnt/fastssd/nzv/HIGGS.csv
-rw-rw-r-- 1 8035497980 Jan 24 16:00 /mnt/fastssd/nzv/HIGGS.csv


HP-Z820:/mnt/fastssd$ ls -l all_bin.csv
-rw-rw-r-- 1 40412077758 Feb  2 09:00 all_bin.csv


ga@ga-HP-Z820:/mnt/fastssd$ time python fastread.py --fileName="all_bin.csv" --numProcesses=32 --balanceFactor=2
2367496


real    0m8.920s
user    1m30.056s
sys 2m38.744s


In [1]: 40412077758. / 8.92
Out[1]: 4530501990.807175

这大约是4.5 GB/s,或者45 GB/s的文件读取速度。那不是旋转硬盘,我的朋友。那其实是三星Pro 950 SSD。

下面是gnu wc(一个纯C编译程序)对同一文件进行行计数的速度基准。

最酷的是,在这种情况下,你可以看到我的纯python程序基本上与gnu wc编译的C程序的速度相匹配。Python是解释的,而C是编译的,所以这是一个非常有趣的速度壮举,我想你会同意的。当然,wc真的需要改变为一个并行程序,然后它真的会打败我的python程序。但是就今天的情况来看,gnu wc只是一个顺序程序。你做你能做的,而python今天可以并行。Cython编译可能会帮助我(在其他时间)。此外,内存映射文件还没有探索。

HP-Z820:/mnt/fastssd$ time wc -l all_bin.csv
2367496 all_bin.csv


real    0m8.807s
user    0m1.168s
sys 0m7.636s




HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000


real    0m2.257s
user    0m12.088s
sys 0m20.512s


HP-Z820:/mnt/fastssd/fast_file_reader$ time wc -l HIGGS.csv
11000000 HIGGS.csv


real    0m1.820s
user    0m0.364s
sys 0m1.456s

结论:与C程序相比,纯python程序的速度更好。但是,在C程序上使用纯python程序还不够好,至少在行计数的目的上是这样。一般来说,该技术可以用于其他文件处理,所以这段python代码仍然很好。

问:只编译一次正则表达式并将其传递给所有工作人员是否会提高速度?答:Regex预编译在这个应用程序中没有帮助。我认为原因是进程序列化和为所有worker创建的开销占主导地位。

还有一件事。 并行CSV文件读取甚至有帮助吗?磁盘是瓶颈,还是CPU?他们说,stackoverflow上许多所谓的顶级答案都包含了常见的开发智慧,即您只需要一个线程就可以读取一个文件,这是您所能做到的最好的。他们确定吗?< / p >

让我们来看看:

HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000


real    0m2.256s
user    0m10.696s
sys 0m19.952s


HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=1
11000000


real    0m17.380s
user    0m11.124s
sys 0m6.272s

哦,是的,是的。并行文件读取工作得很好。好吧,这就对了!

Ps.如果你们中的一些人想知道,当使用单个工作进程时,如果balanceFactor是2会怎样?嗯,这太可怕了:

HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=2
11000000


real    1m37.077s
user    0m12.432s
sys 1m24.700s

fastread.py python程序的关键部分:

fileBytes = stat(fileName).st_size  # Read quickly from OS how many bytes are in a text file
startByte, endByte = PartitionDataToWorkers(workers=numProcesses, items=fileBytes, balanceFactor=balanceFactor)
p = Pool(numProcesses)
partialSum = p.starmap(ReadFileSegment, zip(startByte, endByte, repeat(fileName))) # startByte is already a list. fileName is made into a same-length list of duplicates values.
globalSum = sum(partialSum)
print(globalSum)




def ReadFileSegment(startByte, endByte, fileName, searchChar='\n'):  # counts number of searchChar appearing in the byte range
with open(fileName, 'r') as f:
f.seek(startByte-1)  # seek is initially at byte 0 and then moves forward the specified amount, so seek(5) points at the 6th byte.
bytes = f.read(endByte - startByte + 1)
cnt = len(re.findall(searchChar, bytes)) # findall with implicit compiling runs just as fast here as re.compile once + re.finditer many times.
return cnt

PartitionDataToWorkers的def只是普通的顺序代码。我省略了它,以防其他人想要练习并行编程是什么样的。我免费提供了比较困难的部分:经过测试和工作的并行代码,以帮助您学习。

感谢:开源的H2O项目,由Arno和Cliff以及H2O工作人员为他们伟大的软件和教学视频,这为我提供了这个纯python高性能并行字节偏移阅读器的灵感,如上所示。H2O使用java进行并行文件读取,可由python和R程序调用,并且速度非常快,比地球上任何读取大型CSV文件的程序都快。

逐行读取大文件的最佳方法是使用python 列举函数

with open(file_name, "rU") as read_file:
for i, row in enumerate(read_file, 1):
#do something
#i in line of that line
#row containts all data of that line

需要经常从上一个位置读取一个大文件?

我已经创建了一个脚本,用于每天多次切割Apache access.log文件。 所以我需要将位置光标设置在上次执行时解析的最后一行上。 为此,我使用了file.seek()file.seek()方法,允许将光标存储在文件中

我的代码:

ENCODING = "utf8"
CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))


# This file is used to store the last cursor position
cursor_position = os.path.join(CURRENT_FILE_DIR, "access_cursor_position.log")


# Log file with new lines
log_file_to_cut = os.path.join(CURRENT_FILE_DIR, "access.log")
cut_file = os.path.join(CURRENT_FILE_DIR, "cut_access", "cut.log")


# Set in from_line
from_position = 0
try:
with open(cursor_position, "r", encoding=ENCODING) as f:
from_position = int(f.read())
except Exception as e:
pass


# We read log_file_to_cut to put new lines in cut_file
with open(log_file_to_cut, "r", encoding=ENCODING) as f:
with open(cut_file, "w", encoding=ENCODING) as fw:
# We set cursor to the last position used (during last run of script)
f.seek(from_position)
for line in f:
fw.write("%s" % (line))


# We save the last position of cursor for next usage
with open(cursor_position, "w", encoding=ENCODING) as fw:
fw.write(str(f.tell()))