如何在Python中廉价地获取大文件的行数?

如何以最节省内存和时间的方式获取大文件的行数?

def file_len(filename):with open(filename) as f:for i, _ in enumerate(f):passreturn i + 1
1068848 次浏览

您可以执行一个子进程并运行wc -l filename

import subprocess
def file_len(fname):p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE,stderr=subprocess.PIPE)result, err = p.communicate()if p.returncode != 0:raise IOError(err)return int(result.strip().split()[0])
def file_len(full_path):""" Count number of lines in a file."""f = open(full_path)nr_of_lines = sum(1 for line in f)f.close()return nr_of_lines

你不能得到任何比这更好。

毕竟,任何解决方案都必须读取整个文件,找出您有多少\n,并返回该结果。

你有更好的方法在不读取整个文件的情况下做到这一点吗?不确定……最好的解决方案始终是I/O绑定,你能做的最好的就是确保你不使用不必要的内存,但看起来你已经覆盖了。

对于我来说,这个变体将是最快的:

#!/usr/bin/env python
def main():f = open('filename')lines = 0buf_size = 1024 * 1024read_f = f.read # loop optimization
buf = read_f(buf_size)while buf:lines += buf.count('\n')buf = read_f(buf_size)
print lines
if __name__ == '__main__':main()

原因:缓冲比逐行读取快,string.count也非常快

打开文件的结果是一个迭代器,可以转换为一个序列,其长度为:

with open(filename) as f:return len(list(f))

这比显式循环更简洁,并且避免了enumerate

这个怎么样?

def file_len(fname):counts = itertools.count()with open(fname) as f:for _ in f: counts.next()return counts.next()

为什么不读取前100行和最后100行并估计平均行长度,然后用这些数字除以总文件大小?如果您不需要确切的值,这可能会奏效。

我相信内存映射文件将是最快的解决方案。我尝试了四个函数:OP发布的函数(opcount);对文件中的行进行简单迭代(simplecount);带有内存映射文件(mmap)的readline(mapcount);和Mykola Kharechko提供的缓冲区读取解决方案(bufcount)。

我将每个函数运行了五次,并计算了120万行文本文件的平均运行时间。

Windows XP、Python 2.5、2GB RAM、2 GHz AMD处理器

以下是我的成果:

mapcount : 0.465599966049simplecount : 0.756399965286bufcount : 0.546800041199opcount : 0.718600034714

编辑:Python 2.6的数字:

mapcount : 0.471799945831simplecount : 0.634400033951bufcount : 0.468800067902opcount : 0.602999973297

所以缓冲区读取策略似乎是Windows/Python 2.6中最快的

以下是代码:

from __future__ import with_statementimport timeimport mmapimport randomfrom collections import defaultdict
def mapcount(filename):f = open(filename, "r+")buf = mmap.mmap(f.fileno(), 0)lines = 0readline = buf.readlinewhile readline():lines += 1return lines
def simplecount(filename):lines = 0for line in open(filename):lines += 1return lines
def bufcount(filename):f = open(filename)lines = 0buf_size = 1024 * 1024read_f = f.read # loop optimization
buf = read_f(buf_size)while buf:lines += buf.count('\n')buf = read_f(buf_size)
return lines
def opcount(fname):with open(fname) as f:for i, l in enumerate(f):passreturn i + 1

counts = defaultdict(list)
for i in range(5):for func in [mapcount, simplecount, bufcount, opcount]:start_time = time.time()assert func("big_file.txt") == 1209138counts[func].append(time.time() - start_time)
for key, vals in counts.items():print key.__name__, ":", sum(vals) / float(len(vals))

一条线,可能很快:

num_lines = sum(1 for line in open('myfile.txt'))

为了完成上面的方法,我尝试了一个带有file输入模块的变体:

import fileinput as fidef filecount(fname):for line in fi.input(fname):passreturn fi.lineno()

并将60mil行文件传递给上述所有方法:

mapcount : 6.1331050396simplecount : 4.588793993opcount : 4.42918205261filecount : 43.2780818939bufcount : 0.170812129974

让我有点惊讶的是,文件输入如此糟糕,而且比所有其他方法都差得多……

这个怎么样?

import syssys.stdin=open('fname','r')data=sys.stdin.readlines()print "counted",len(data),"lines"

为什么以下方法行不通?

import sys
# input comes from STDINfile = sys.stdindata = file.readlines()
# get total number of lines in filelines = len(data)
print lines

在这种情况下,len函数使用输入行作为确定长度的方法。

count = max(enumerate(open(filename)))[0]

这个怎么样?

import fileinputimport sys
counter=0for line in fileinput.input([sys.argv[1]]):counter+=1
fileinput.close()print counter

这是一个使用多重处理库在机器/内核之间分配行计数的python程序。我的测试使用8核windows 64服务器将2000万行文件的计数从26秒提高到7秒。注意:不使用内存映射会使事情变得更慢。

import multiprocessing, sys, time, os, mmapimport logging, logging.handlers
def init_logger(pid):console_format = 'P{0} %(levelname)s %(message)s'.format(pid)logger = logging.getLogger()  # New logger at root levellogger.setLevel( logging.INFO )logger.handlers.append( logging.StreamHandler() )logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )
def getFileLineCount( queues, pid, processes, file1 ):init_logger(pid)logging.info( 'start' )
physical_file = open(file1, "r")#  mmap.mmap(fileno, length[, tagname[, access[, offset]]]
m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ )
#work out file size to divide up line counting
fSize = os.stat(file1).st_sizechunk = (fSize / processes) + 1
lines = 0
#get where I start and stop_seedStart = chunk * (pid)_seekEnd = chunk * (pid+1)seekStart = int(_seedStart)seekEnd = int(_seekEnd)
if seekEnd < int(_seekEnd + 1):seekEnd += 1
if _seedStart < int(seekStart + 1):seekStart += 1
if seekEnd > fSize:seekEnd = fSize
#find where to startif pid > 0:m1.seek( seekStart )#read next linel1 = m1.readline()  # need to use readline with memory mapped filesseekStart = m1.tell()
#tell previous rank my seek start to make their seek end
if pid > 0:queues[pid-1].put( seekStart )if pid < processes-1:seekEnd = queues[pid].get()
m1.seek( seekStart )l1 = m1.readline()
while len(l1) > 0:lines += 1l1 = m1.readline()if m1.tell() > seekEnd or len(l1) == 0:break
logging.info( 'done' )# add up the resultsif pid == 0:for p in range(1,processes):lines += queues[0].get()queues[0].put(lines) # the total lines countedelse:queues[0].put(lines)
m1.close()physical_file.close()
if __name__ == '__main__':init_logger( 'main' )if len(sys.argv) > 1:file_name = sys.argv[1]else:logging.fatal( 'parameters required: file-name [processes]' )exit()
t = time.time()processes = multiprocessing.cpu_count()if len(sys.argv) > 2:processes = int(sys.argv[2])queues=[] # a queue for each processfor pid in range(processes):queues.append( multiprocessing.Queue() )jobs=[]prev_pipe = 0for pid in range(processes):p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) )p.start()jobs.append(p)
jobs[0].join() #wait for counting to finishlines = queues[0].get()
logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) )

我修改了缓冲区案例,如下所示:

def CountLines(filename):f = open(filename)try:lines = 1buf_size = 1024 * 1024read_f = f.read # loop optimizationbuf = read_f(buf_size)
# Empty fileif not buf:return 0
while buf:lines += buf.count('\n')buf = read_f(buf_size)
return linesfinally:f.close()

现在还计算空文件和最后一行(不含\n)。

这个版本有一个小的(4-8%)改进,它重用了一个常量缓冲区,所以它应该避免任何内存或GC开销:

lines = 0buffer = bytearray(2048)with open(filename) as f:while f.readinto(buffer) > 0:lines += buffer.count('\n')

您可以使用缓冲区大小,也许会看到一些改进。

同样:

lines = 0with open(path) as f:for line in f:lines += 1

这个单行怎么样:

file_length = len(open('myfile.txt','r').read().split('\n'))

使用此方法在3900行文件上计时需要0.003秒

def c():import times = time.time()file_length = len(open('myfile.txt','r').read().split('\n'))print time.time() - s

我将使用Python的文件对象方法readlines,如下所示:

with open(input_file) as foo:lines = len(foo.readlines())

这将打开文件,在文件中创建行列表,计算列表的长度,将其保存到变量并再次关闭文件。

print open('file.txt', 'r').read().count("\n") + 1
def line_count(path):count = 0with open(path) as lines:for count, l in enumerate(lines, start=1):passreturn count

如果你想在LinuxPython中便宜地获取行数,我推荐这个方法:

import osprint os.popen("wc -l file_path").readline().split()[0]

file_path可以是抽象文件路径,也可以是相对路径。

Kyle的回答

num_lines = sum(1 for line in open('my_file.txt'))

可能是最好的,另一种选择是

num_lines =  len(open('my_file.txt').read().splitlines())

下面是两者性能的比较

In [20]: timeit sum(1 for line in open('Charts.ipynb'))100000 loops, best of 3: 9.79 µs per loop
In [21]: timeit len(open('Charts.ipynb').read().splitlines())100000 loops, best of 3: 12 µs per loop

您可以通过以下方式使用os.path模块:

import osimport subprocessNumber_lines = int( (subprocess.Popen( 'wc -l {0}'.format( Filename ), shell=True, stdout=subprocess.PIPE).stdout).readlines()[0].split()[0] )

,其中Filename是文件的绝对路径。

我不得不在一个类似的问题上发布这个问题,直到我的声誉分数跳了一点(感谢谁撞了我!)。

所有这些解决方案都忽略了一种使其运行得更快的方法,即使用无缓冲(原始)接口、使用字节数组和自己进行缓冲。(这仅适用于Python 3。在Python 2中,默认情况下可以使用也可以不使用原始接口,但在Python 3中,您将默认使用Unicode。)

使用修改版本的计时工具,我相信以下代码比提供的任何解决方案都更快(并且稍微更Pythonic):

def rawcount(filename):f = open(filename, 'rb')lines = 0buf_size = 1024 * 1024read_f = f.raw.read
buf = read_f(buf_size)while buf:lines += buf.count(b'\n')buf = read_f(buf_size)
return lines

使用单独的生成器函数,这会运行得更快:

def _make_gen(reader):b = reader(1024 * 1024)while b:yield bb = reader(1024*1024)
def rawgencount(filename):f = open(filename, 'rb')f_gen = _make_gen(f.raw.read)return sum( buf.count(b'\n') for buf in f_gen )

这可以完全通过使用迭代工具的内联生成器表达式来完成,但它看起来很奇怪:

from itertools import (takewhile,repeat)
def rawincount(filename):f = open(filename, 'rb')bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))return sum( buf.count(b'\n') for buf in bufgen )

以下是我的时间表:

function      average, s  min, s   ratiorawincount        0.0043  0.0041   1.00rawgencount       0.0044  0.0042   1.01rawcount          0.0048  0.0045   1.09bufcount          0.008   0.0068   1.64wccount           0.01    0.0097   2.35itercount         0.014   0.014    3.41opcount           0.02    0.02     4.83kylecount         0.021   0.021    5.05simplecount       0.022   0.022    5.25mapcount          0.037   0.031    7.46

另一种可能:

import subprocess
def num_lines_in_file(fpath):return int(subprocess.check_output('wc -l %s' % fpath, shell=True).strip().split()[0])

这段代码更短更清晰。这可能是最好的方法:

num_lines = open('yourfile.ext').read().count('\n')

这是我用纯python找到的最快的东西。你可以通过设置缓冲区来使用任何你想要的内存量,尽管2**16似乎是我电脑上的一个甜蜜点。

from functools import partial
buffer=2**16with open(myfile) as f:print sum(x.count('\n') for x in iter(partial(f.read,buffer), ''))

我在这里找到了答案为什么在C++中从标准输入读取行比Python慢得多?并稍微调整了一下。这是一个很好的阅读,可以理解如何快速计数行,尽管wc -l仍然比其他任何东西快75%。

单线解决方案:

import osos.system("wc -l  filename")

我的片段:

>>> os.system('wc -l *.txt')
0 bar.txt1000 command.txt3 test_file.txt1003 total

类似于这个答案的单行bash解决方案,使用现代subprocess.check_output函数:

def line_count(filename):return int(subprocess.check_output(['wc', '-l', filename]).split()[0])

这是我使用的,看起来很干净:

import subprocess
def count_file_lines(file_path):"""Counts the number of lines in a file using wc utility.:param file_path: path to file:return: int, no of lines"""num = subprocess.check_output(['wc', '-l', file_path])num = num.split(' ')return int(num[0])

更新:这比使用纯python稍微快一点,但以内存占用为代价。子进程将在执行您的命令时分叉一个与父进程具有相同内存占用的新进程。

def count_text_file_lines(path):with open(path, 'rt') as file:line_count = sum(1 for _line in file)return line_count

如果文件可以放入内存,则

with open(fname) as f:count = len(f.read().split(b'\n')) - 1

创建一个名为count.py的可执行脚本文件:

#!/usr/bin/python
import syscount = 0for line in sys.stdin:count+=1

然后将文件的内容通过管道传输到python脚本中:cat huge.txt | ./count.py。管道也适用于PowerShell,因此您最终将计算行数。

对我来说,Linux它比简单的解决方案快30%:

count=1with open('huge.txt') as f:count+=1

如果文件中的所有行长度相同(并且仅包含ASCII字符)*,则可以非常便宜地执行以下操作:

fileSize     = os.path.getsize( pathToFile )  # file size in bytesbytesPerLine = someInteger                    # don't forget to account for the newline characternumLines     = fileSize // bytesPerLine

*如果使用像é这样的Unicode字符,我怀疑需要更多的努力来确定一行中的字节数。

简单方法:

1)

>>> f = len(open("myfile.txt").readlines())>>> f
430
>>> f = open("myfile.txt").read().count('\n')>>> f430>>>
num_lines = len(list(open('myfile.txt')))

大文件的替代方案是使用#0

count = 0for line in open(thefilepath).xreadlines(  ): count += 1

对于Python 3,请参阅:Python 3中用什么替代xreadline()?

使用Numba

我们可以使用Numba来JIT(Just in time)将我们的函数编译为机器代码。def numBacount并行(fname)运行速度快2.8倍比deffile_len(fname)的问题。

备注:

在运行基准测试之前,操作系统已经将文件缓存到内存中,因为我在PC上看不到太多磁盘活动。第一次读取文件时的时间会慢得多,使得使用Numba的时间优势微不足道。

第一次调用函数时,JIT编译需要额外的时间。

如果我们做的不仅仅是数行数,这将是有用的。

Cython是另一种选择。

http://numba.pydata.org/

结论

由于计数行将是IO绑定,请使用问题中的deffile_len(fname),除非您想做的不仅仅是计数行。

import timeit
from numba import jit, prangeimport numpy as np
from itertools import (takewhile,repeat)
FILE = '../data/us_confirmed.csv' # 40.6MB, 371755 line fileCR = ord('\n')

# Copied from the question above. Used as a benchmarkdef file_len(fname):with open(fname) as f:for i, l in enumerate(f):passreturn i + 1

# Copied from another answer. Used as a benchmarkdef rawincount(filename):f = open(filename, 'rb')bufgen = takewhile(lambda x: x, (f.read(1024*1024*10) for _ in repeat(None)))return sum( buf.count(b'\n') for buf in bufgen )

# Single thread@jit(nopython=True)def numbacountsingle_chunk(bs):
c = 0for i in range(len(bs)):if bs[i] == CR:c += 1
return c

def numbacountsingle(filename):f = open(filename, "rb")total = 0while True:chunk = f.read(1024*1024*10)lines = numbacountsingle_chunk(chunk)total += linesif not chunk:break
return total

# Multi thread@jit(nopython=True, parallel=True)def numbacountparallel_chunk(bs):
c = 0for i in prange(len(bs)):if bs[i] == CR:c += 1
return c

def numbacountparallel(filename):f = open(filename, "rb")total = 0while True:chunk = f.read(1024*1024*10)lines = numbacountparallel_chunk(np.frombuffer(chunk, dtype=np.uint8))total += linesif not chunk:break
return total
print('numbacountparallel')print(numbacountparallel(FILE)) # This allows Numba to compile and cache the function without adding to the time.print(timeit.Timer(lambda: numbacountparallel(FILE)).timeit(number=100))
print('\nnumbacountsingle')print(numbacountsingle(FILE))print(timeit.Timer(lambda: numbacountsingle(FILE)).timeit(number=100))
print('\nfile_len')print(file_len(FILE))print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))
print('\nrawincount')print(rawincount(FILE))print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))

对每个函数进行100次调用的时间(秒)

numbacountparallel3717552.8007332000000003
numbacountsingle3717553.1508585999999994
file_len3717556.7945494
rawincount3717556.815438

这是对其他一些答案的元评论。

  • 行读取和缓冲\n计数技术不会为每个文件返回相同的答案,因为某些文本文件的最后一行末尾没有换行符。您可以通过检查最后一个非空缓冲区的最后一个字节并在不是b'\n'时添加1来解决此问题。

  • 在Python 3中,以文本模式和二进制模式打开文件会产生不同的结果,因为文本模式默认将CR、LF和CRLF识别为行尾(将它们全部转换为'\n'),而在二进制模式下,如果您计数b'\n',则仅计算LF和CRLF。无论您是按行读取还是读取固定大小的缓冲区,这都适用。经典的Mac OS使用CR作为行尾;我不知道这些文件现在有多常见。

  • 缓冲区读取方法使用与文件大小无关的有限数量的RAM,而行读取方法在最坏的情况下可以一次将整个文件读取到RAM中(特别是如果文件使用CR行结尾)。在最坏的情况下,它可能会使用比文件大小更多的RAM,因为动态调整行缓冲区的大小和(如果您以文本模式打开)Unicode解码和存储的开销。

  • 您可以通过预分配字节数组并使用readinto而不是read来提高缓冲方法的内存使用情况,甚至可能提高速度。现有的答案之一(很少投票)可以做到这一点,但它有错误(它重复计算一些字节)。

  • 顶部缓冲区读取答案使用一个大缓冲区(1 MiB)。使用较小的缓冲区实际上可以更快,因为操作系统可以提前读取。如果你一次读取32K或64K,操作系统可能会在你要求之前开始将下一个32K/64K读取到缓存中,并且每次内核之旅几乎都会立即返回。如果你一次读取1 MiB,操作系统不太可能推测读取一整兆字节。它可能会预读较少的数量,但你仍然会花费大量时间坐在内核中等待磁盘返回其余数据。

已经有很多答案了,但不幸的是,其中大多数只是一个几乎无法优化的问题上的小经济体。

我参与了几个项目,其中行数是软件的核心功能,并且尽可能快地处理大量文件至关重要。

行数的主要瓶颈是I/O访问,因为您需要读取每行以检测行返回字符,根本没有办法。第二个潜在的瓶颈是内存管理:一次加载的越多,处理的速度就越快,但与第一个瓶颈相比,这个瓶颈可以忽略不计。

因此,有3种主要方法可以减少行数函数的流转时长,除了微小的优化,如禁用gc收集和其他微观管理技巧:

  1. 硬件解决方案:主要和最明显的方法是非程序化的:购买非常快的SSD/闪存硬盘驱动器。到目前为止,这就是您获得最大速度提升的方式。

  2. 数据准备解决方案:如果您生成或可以修改处理文件的方式,或者可以预处理它们,首先将行返回转换为unix样式(\n),因为与Windows或MacOS样式相比,这将节省1个字符(节省不多,但很容易获得),其次也是最重要的是,您可以编写固定长度的行。如果您需要可变长度,您可以随时填充较小的行。这样,您可以立即从总文件大小中计算行数,访问速度要快得多。通常,解决问题的最佳解决方案是对问题进行预处理,以便它更适合您的最终目的。

  3. 并行化+硬件解决方案:如果您可以购买多个硬盘(如果可能的话还有SSD闪存磁盘),那么您甚至可以通过利用并行化来超越一个磁盘的速度,方法是在磁盘之间以平衡的方式存储文件(最简单的是按总大小平衡),然后从所有这些磁盘并行读取。然后,您可以期望与您拥有的磁盘数量成比例地获得乘数提升。如果购买多个磁盘不是您的选择,那么并行化可能无济于事(除非您的磁盘具有多个读取标头,例如一些专业级磁盘,但即使如此,磁盘的内部缓存存储器和PCB电路也可能成为瓶颈,并阻止您完全并行使用所有磁头,此外,您必须为您将使用的此硬盘驱动器设计特定代码,因为您需要知道确切的集群映射,以便将文件存储在不同磁头下的集群上,并且之后可以使用不同的磁头读取它们)。实际上,众所周知,顺序读取几乎总是比随机读取快,并且单个磁盘上的并行化将具有比顺序读取更类似于随机读取的性能(例如,您可以使用CrystemDiskMark在这两个方面测试您的硬盘驱动器速度)。

如果这些都不是一个选择,那么你只能依靠微观管理技巧来提高你的行数功能的速度几个百分点,但不要指望任何真正重要的东西。相反,你可以期望你花在调整上的时间与你将看到的速度改进的回报不成比例。

灌流图分析之后,必须推荐缓冲读取解决方案

def buf_count_newlines_gen(fname):def _make_gen(reader):while True:b = reader(2 ** 16)if not b: breakyield b
with open(fname, "rb") as f:count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read))return count

它速度快,内存效率高。大多数其他解决方案大约慢20倍。

在此处输入图片描述


重现情节的代码:

import mmapimport subprocessfrom functools import partial
import perfplot

def setup(n):fname = "t.txt"with open(fname, "w") as f:for i in range(n):f.write(str(i) + "\n")return fname

def for_enumerate(fname):i = 0with open(fname) as f:for i, _ in enumerate(f):passreturn i + 1

def sum1(fname):return sum(1 for _ in open(fname))

def mmap_count(fname):with open(fname, "r+") as f:buf = mmap.mmap(f.fileno(), 0)
lines = 0while buf.readline():lines += 1return lines

def for_open(fname):lines = 0for _ in open(fname):lines += 1return lines

def buf_count_newlines(fname):lines = 0buf_size = 2 ** 16with open(fname) as f:buf = f.read(buf_size)while buf:lines += buf.count("\n")buf = f.read(buf_size)return lines

def buf_count_newlines_gen(fname):def _make_gen(reader):b = reader(2 ** 16)while b:yield bb = reader(2 ** 16)
with open(fname, "rb") as f:count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read))return count

def wc_l(fname):return int(subprocess.check_output(["wc", "-l", fname]).split()[0])

def sum_partial(fname):with open(fname) as f:count = sum(x.count("\n") for x in iter(partial(f.read, 2 ** 16), ""))return count

def read_count(fname):return open(fname).read().count("\n")

b = perfplot.bench(setup=setup,kernels=[for_enumerate,sum1,mmap_count,for_open,wc_l,buf_count_newlines,buf_count_newlines_gen,sum_partial,read_count,],n_range=[2 ** k for k in range(27)],xlabel="num lines",)b.save("out.png")b.show()

我使用的最简单和最短的方法是:

f = open("my_file.txt", "r")len(f.readlines())

我发现你只能。

f = open("data.txt")linecout = len(f.readlines())

会给你一个答案