获取文本文件的第一行和最后一行的最有效方法是什么?

我有一个文本文件,其中包含每一行的时间戳。我的目标是找到时间范围。所有的时间都是按顺序排列的,所以第一行是最早的时间,最后一行是最晚的时间。我只需要第一行和最后一行。在 python 中获取这些行的最有效方法是什么?

注意: 这些文件的长度相对较大,每个文件大约有1-2百万行,对于几百个文件我必须这样做。

141973 次浏览

输入输出模块的文件

with open(fname, 'rb') as fh:
first = next(fh).decode()


fh.seek(-1024, 2)
last = fh.readlines()[-1].decode()

这里的变量值是1024: 它表示字符串的平均长度。例如,我选择1024。如果你有一个平均行长度的估计值,你可以只使用这个值乘以2。

因为你根本不知道行长度可能的上界,显而易见的解决方案是循环遍历文件:

for line in fh:
pass
last = line

您不需要使用二进制标志,只需使用 open(fname)即可。

ETA : 由于需要处理许多文件,因此可以使用 random.sample创建几十个文件的示例,并在这些文件上运行此代码以确定最后一行的长度。位置偏移的先验大值(比如1MB)。这将帮助您估计整个运行的价值。

可以使用 unix 命令吗?我认为使用 head -1tail -n 1可能是最有效的方法。或者,您可以使用一个简单的 fid.readline()来获得第一行和 fid.readlines()[-1],但是这可能会占用太多的内存。

得到第一行是非常容易的。对于最后一行,假设您知道行长的一个近似上界,奥斯・伊尔索SEEK_END中找到第二行到最后一行的结尾,然后 Readline ()找到最后一行。

下面是 SilentGhost 的一个修改版本,它可以满足您的需要。

with open(fname, 'rb') as fh:
first = next(fh)
offs = -100
while True:
fh.seek(offs, 2)
lines = fh.readlines()
if len(lines)>1:
last = lines[-1]
break
offs *= 2
print first
print last

这里不需要线长度的上界。

To read both the first and final line of a file you could...

  • 打开文件。
  • 读取第一行使用内置的 readline(),..。
  • ... 寻找(移动光标)到文件的末尾,..。
  • ... 后退一步,直到遇到 EOL(换行)和..。
  • 读最后一行。
def readlastline(f):
f.seek(-2, 2)              # Jump to the second last byte.
while f.read(1) != b"\n":  # Until EOL is found ...
f.seek(-2, 1)          # ... jump back, over the read byte plus one more.
return f.read()            # Read all data from this point on.
    

with open(file, "rb") as f:
first = f.readline()
last = readlastline(f)

直接跳转到 第二最后一个字节,以防止尾随的换行符导致返回空行 * 。

每次读取一个字节时,当前偏移量被向前推送一个字节,因此每次向后步进两个字节,超过最近读取的字节和接下来要读取的字节。

The whence parameter passed to fseek(offset, whence=0) indicates that fseek should seek to a position offset bytes relative to...

* 正如大多数应用程序(包括 printecho)的默认行为所预期的那样,在写入的每一行后面加上一行,对缺少换行符的行没有影响。


Efficiency

1-2 million lines each and I have to do this for several hundred files.

我计算了这个方法的时间,并将其与最佳答案进行了比较。

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s.
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95.

数百万行将增加差异 很多更多。

用于计时的精确代码:

with open(file, "rb") as f:
first = f.readline()     # Read and store the first line.
for last in f: pass      # Read all lines, keep final value.

修正案

一个更复杂,更难以阅读,变化处理的意见和问题提出以来。

  • 解析空文件时返回空字符串,由 评论引发。
  • 找不到分隔符时返回所有内容,由 评论引发。
  • 避免相对偏移以支持由 评论引起的 短信模式
  • UTF16/UTF32 hack, noted by 评论.

还增加了对多字节分隔符 readlast(b'X<br>Y', b'<br>', fixed=False)的支持。

请注意,这个变化是 真的慢大文件,因为在文本模式中需要的非相对偏移量。根据您的需要进行修改,或者根本不使用它,因为使用 f.readlines()[-1]打开文本模式下的文件可能更好。

#!/bin/python3


from os import SEEK_END


def readlast(f, sep, fixed=True):
r"""Read the last segment from a file-like object.


:param f: File to read last line from.
:type  f: file-like object
:param sep: Segment separator (delimiter).
:type  sep: bytes, str
:param fixed: Treat data in ``f`` as a chain of fixed size blocks.
:type  fixed: bool
:returns: Last line of file.
:rtype: bytes, str
"""
bs   = len(sep)
step = bs if fixed else 1
if not bs:
raise ValueError("Zero-length separator.")
try:
o = f.seek(0, SEEK_END)
o = f.seek(o-bs-step)    # - Ignore trailing delimiter 'sep'.
while f.read(bs) != sep: # - Until reaching 'sep': Read sep-sized block
o = f.seek(o-step)   #  and then seek to the block to read next.
except (OSError,ValueError): # - Beginning of file reached.
f.seek(0)
return f.read()


def test_readlast():
from io import BytesIO, StringIO
    

# Text mode.
f = StringIO("first\nlast\n")
assert readlast(f, "\n") == "last\n"
    

# Bytes.
f = BytesIO(b'first|last')
assert readlast(f, b'|') == b'last'
    

# Bytes, UTF-8.
f = BytesIO("X\nY\n".encode("utf-8"))
assert readlast(f, b'\n').decode() == "Y\n"
    

# Bytes, UTF-16.
f = BytesIO("X\nY\n".encode("utf-16"))
assert readlast(f, b'\n\x00').decode('utf-16') == "Y\n"
  

# Bytes, UTF-32.
f = BytesIO("X\nY\n".encode("utf-32"))
assert readlast(f, b'\n\x00\x00\x00').decode('utf-32') == "Y\n"
    

# Multichar delimiter.
f = StringIO("X<br>Y")
assert readlast(f, "<br>", fixed=False) == "Y"
    

# Make sure you use the correct delimiters.
seps = { 'utf8': b'\n', 'utf16': b'\n\x00', 'utf32': b'\n\x00\x00\x00' }
assert "\n".encode('utf8' )     == seps['utf8']
assert "\n".encode('utf16')[2:] == seps['utf16']
assert "\n".encode('utf32')[4:] == seps['utf32']
    

# Edge cases.
edges = (
# Text , Match
(""    , ""  ), # Empty file, empty string.
("X"   , "X" ), # No delimiter, full content.
("\n"  , "\n"),
("\n\n", "\n"),
# UTF16/32 encoded U+270A (b"\n\x00\n'\n\x00"/utf16)
(b'\n\xe2\x9c\x8a\n'.decode(), b'\xe2\x9c\x8a\n'.decode()),
)
for txt, match in edges:
for enc,sep in seps.items():
assert readlast(BytesIO(txt.encode(enc)), sep).decode(enc) == match


if __name__ == "__main__":
import sys
for path in sys.argv[1:]:
with open(path) as f:
print(f.readline()    , end="")
print(readlast(f,"\n"), end="")

首先以读模式打开文件。然后使用 readlines ()方法逐行读取。存储在列表中的所有行。现在您可以使用列表切片来获取文件的第一行和最后一行。

    a=open('file.txt','rb')
lines = a.readlines()
if lines:
first_line = lines[:1]
last_line = lines[-1]
w=open(file.txt, 'r')
print ('first line is : ',w.readline())
for line in w:
x= line
print ('last line is : ',x)
w.close()

for循环遍历这些行,x获得最后一次迭代的最后一行。

with open("myfile.txt") as f:
lines = f.readlines()
first_row = lines[0]
print first_row
last_row = lines[-1]
print last_row

这是我的解决方案,也与 Python 3兼容。它也管理边界案例,但它缺少 utf-16的支持:

def tail(filepath):
"""
@author Marco Sulla (marcosullaroma@gmail.com)
@date May 31, 2016
"""


try:
filepath.is_file
fp = str(filepath)
except AttributeError:
fp = filepath


with open(fp, "rb") as f:
size = os.stat(fp).st_size
start_pos = 0 if size - 1 < 0 else size - 1


if start_pos != 0:
f.seek(start_pos)
char = f.read(1)


if char == b"\n":
start_pos -= 1
f.seek(start_pos)


if start_pos == 0:
f.seek(start_pos)
else:
char = ""


for pos in range(start_pos, -1, -1):
f.seek(pos)


char = f.read(1)


if char == b"\n":
break


return f.readline()

它的灵感来自 Trasp 的回答又是帕克的评论

下面是@Trasp 答案的一个扩展,它具有额外的逻辑,用于处理只有一行的文件的角落大小写。如果您反复想要读取正在不断更新的文件的最后一行,那么处理这种情况可能是有用的。如果没有这个选项,如果您尝试抓取刚刚创建并且只有一行的文件的最后一行,则将引发 IOError: [Errno 22] Invalid argument

def tail(filepath):
with open(filepath, "rb") as f:
first = f.readline()      # Read the first line.
f.seek(-2, 2)             # Jump to the second last byte.
while f.read(1) != b"\n": # Until EOL is found...
try:
f.seek(-2, 1)     # ...jump back the read byte plus one more.
except IOError:
f.seek(-1, 1)
if f.tell() == 0:
break
last = f.readline()       # Read last line.
return last

没有人提到使用反向:

f=open(file,"r")
r=reversed(f.readlines())
last_line_of_file = r.next()
with open(filename, "rb") as f:#Needs to be in binary mode for the seek from the end to work
first = f.readline()
if f.read(1) == '':
return first
f.seek(-2, 2)  # Jump to the second last byte.
while f.read(1) != b"\n":  # Until EOL is found...
f.seek(-2, 1)  # ...jump back the read byte plus one more.
last = f.readline()  # Read last line.
return last

上述答案是上述答案的修改版本,它处理文件中只有一行的情况