我如何能逐行读取大文本文件,而不将它们加载到内存?

我想逐行读取一个大文件(5GB),而不将其全部内容加载到内存中。我不能使用readlines(),因为它在内存中创建了一个非常大的列表。

362409 次浏览
你最好使用迭代器代替。
相关:fileinput -迭代多个输入流中的行。< / p >

从文档中可以看出:

import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
process(line)

这将避免将整个文件一次复制到内存中。

你所需要做的就是使用file对象作为迭代器。

for line in open("log.txt"):
do_something_with(line)

在最近的Python版本中使用上下文管理器更好。

with open("log.txt") as fileobject:
for line in fileobject:
do_something_with(line)

这也会自动关闭文件。

在文件对象上使用for循环逐行读取它。使用with open(...)上下文管理器确保文件在读取后关闭:

with open("log.txt") as infile:
for line in infile:
print(line)

老派方法:

fh = open(file_name, 'rt')
line = fh.readline()
while line:
# do stuff with line
line = fh.readline()
fh.close()

我不敢相信这能像@john-la-rooy的回答看起来那么简单。因此,我使用逐行读写重新创建了cp命令。这是疯狂的快。

#!/usr/bin/env python3.6


import sys


with open(sys.argv[2], 'w') as outfile:
with open(sys.argv[1]) as infile:
for line in infile:
outfile.write(line)

这个怎么样? 将文件划分为块,然后逐行读取,因为当您读取文件时,操作系统将缓存下一行。如果逐行读取文件,则不能有效利用缓存的信息。< / p >

相反,将文件划分为块,并将整个块加载到内存中,然后进行处理。

def chunks(file,size=1024):
while 1:


startat=fh.tell()
print startat #file's object current position from the start
fh.seek(size,1) #offset from current postion -->1
data=fh.readline()
yield startat,fh.tell()-startat #doesnt store whole list in memory
if not data:
break
if os.path.isfile(fname):
try:
fh=open(fname,'rb')
except IOError as e: #file --> permission denied
print "I/O error({0}): {1}".format(e.errno, e.strerror)
except Exception as e1: #handle other exceptions such as attribute errors
print "Unexpected error: {0}".format(e1)
for ele in chunks(fh):
fh.seek(ele[0])#startat
data=fh.read(ele[1])#endat
print data

谢谢你!我最近已经转换到python 3,并对使用readlines(0)读取大文件感到沮丧。这就解决了问题。但是为了得到每一行,我必须做一些额外的步骤。每一行之前都有一个“b”,我猜这是二进制格式的。使用“decode(utf-8)”将其更改为ascii。

然后我必须在每行中间删除一个“=\n”。

然后我在新线处把线分开。

b_data=(fh.read(ele[1]))#endat This is one chunk of ascii data in binary format
a_data=((binascii.b2a_qp(b_data)).decode('utf-8')) #Data chunk in 'split' ascii format
data_chunk = (a_data.replace('=\n','').strip()) #Splitting characters removed
data_list = data_chunk.split('\n')  #List containing lines in chunk
#print(data_list,'\n')
#time.sleep(1)
for j in range(len(data_list)): #iterate through data_list to get each item
i += 1
line_of_data = data_list[j]
print(line_of_data)

下面是Arohi代码中“打印数据”上方的代码。

大火项目在过去的6年里已经走了很长的路。它有一个简单的API,涵盖了pandas功能的一个有用子集。

dask.dataframe在内部处理分块,支持许多可并行操作,并允许您轻松地将切片导出回pandas以进行内存操作。

import dask.dataframe as dd


df = dd.read_csv('filename.csv')
df.head(10)  # return first 10 rows
df.tail(10)  # return last 10 rows


# iterate rows
for idx, row in df.iterrows():
...


# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()


# slice by column
df[df.my_field=='XYZ'].compute()

请试试这个:

with open('filename','r',buffering=100000) as f:
for line in f:
print line

如果你在文件中没有换行符,你可以这样做:

with open('large_text.txt') as f:
while True:
c = f.read(1024)
if not c:
break
print(c,end='')
这里是加载任何大小的文本文件而不导致内存问题的代码。 它支持gb大小的文件

https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d

下载文件data_loading_utils.py并将其导入到你的代码中

使用

import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000




def process_lines(data, eof, file_name):


# check if end of file reached
if not eof:
# process data, data is one single line of the file


else:
# end of file reached


data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)

process_lines方法是回调函数。它将对所有行调用,参数数据每次表示文件的一行。

你可以根据你的机器硬件配置配置变量CHUNK_SIZE

当您希望并行工作并只读取数据块,但要用新行保持数据整洁时,这可能很有用。

def readInChunks(fileObj, chunkSize=1024):
while True:
data = fileObj.read(chunkSize)
if not data:
break
while data[-1:] != '\n':
data+=fileObj.read(1)
yield data

这是我找到的最佳解决方案,我在330 MB的文件上尝试了一下。

lineno = 500
line_length = 8
with open('catfour.txt', 'r') as file:
file.seek(lineno * (line_length + 2))
print(file.readline(), end='')

其中line_length是单行中的字符数。例如,“abcd”的行长为4。

我添加了2个行长来跳过'\n'字符并移动到下一个字符。

我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!

加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。

希望对大家有用:

def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json


linesRtn = []
for lineIn in linesIn:


if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
        

linesRtn.append(lineRtn)


return linesRtn








# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp


path1 = "C:\\user\\Documents\\"
file1 = "someBigJson.json"


nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead


iJob = 0  #Tracker for SMP jobs submitted into pool
iiJob = 0  #Tracker for SMP jobs extracted back out of pool


jobs = []  #SMP job holder
MTres3 = []  #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
            

#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
                

#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
                            

#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
            

#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1


#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1


#Cleanup
del chunk, jobs, temp1
pool.close()