我不得不这样做一段时间前,并使用下面的代码。它通向外壳。恐怕我没有完整的剧本了。如果你在一个 Unixish 操作系统上,你可以使用“ tac”,但是在 Mac OSX 上,tac 命令不起作用,使用 tail-r。下面的代码片段测试您所在的平台,并相应地调整命令
# We need a command to reverse the line order of the file. On Linux this
# is 'tac', on OSX it is 'tail -r'
# 'tac' is not supported on osx, 'tail -r' is not supported on linux.
if sys.platform == "darwin":
command += "|tail -r"
elif sys.platform == "linux2":
command += "|tac"
else:
raise EnvironmentError('Platform %s not supported' % sys.platform)
import os
def reverse_readline(filename, buf_size=8192):
"""A generator that returns the lines of a file in reverse order"""
with open(filename) as fh:
segment = None
offset = 0
fh.seek(0, os.SEEK_END)
file_size = remaining_size = fh.tell()
while remaining_size > 0:
offset = min(file_size, offset + buf_size)
fh.seek(file_size - offset)
buffer = fh.read(min(remaining_size, buf_size))
remaining_size -= buf_size
lines = buffer.split('\n')
# The first line of the buffer is probably not a complete line so
# we'll save it and append it to the last line of the next buffer
# we read
if segment is not None:
# If the previous chunk starts right from the beginning of line
# do not concat the segment to the last line of new chunk.
# Instead, yield the segment first
if buffer[-1] != '\n':
lines[-1] += segment
else:
yield segment
segment = lines[0]
for index in range(len(lines) - 1, 0, -1):
if lines[index]:
yield lines[index]
# Don't yield None if the file was empty
if segment is not None:
yield segment
import os
def readlines_reverse(filename):
with open(filename) as qfile:
qfile.seek(0, os.SEEK_END)
position = qfile.tell()
line = ''
while position >= 0:
qfile.seek(position)
next_char = qfile.read(1)
if next_char == "\n":
yield line[::-1]
line = ''
else:
line += next_char
position -= 1
yield line[::-1]
if __name__ == '__main__':
for qline in readlines_reverse(raw_input()):
print qline
#!/usr/bin/env python2.7
from file_read_backwards import FileReadBackwards
with FileReadBackwards("/path/to/file", encoding="utf-8") as frb:
for l in frb:
print l
with open(filename) as fp:
for line in fp:
#print line, # contains new line
print '>{}<'.format(line)
我想换成:
with open(filename) as fp:
for line in reversed_fp_iter(fp, 4):
#print line, # contains new line
print '>{}<'.format(line)
下面是一个修改后的答案,它需要一个文件句柄并保持换行:
def reversed_fp_iter(fp, buf_size=8192):
"""a generator that returns the lines of a file in reverse order
ref: https://stackoverflow.com/a/23646049/8776239
"""
segment = None # holds possible incomplete segment at the beginning of the buffer
offset = 0
fp.seek(0, os.SEEK_END)
file_size = remaining_size = fp.tell()
while remaining_size > 0:
offset = min(file_size, offset + buf_size)
fp.seek(file_size - offset)
buffer = fp.read(min(remaining_size, buf_size))
remaining_size -= buf_size
lines = buffer.splitlines(True)
# the first line of the buffer is probably not a complete line so
# we'll save it and append it to the last line of the next buffer
# we read
if segment is not None:
# if the previous chunk starts right from the beginning of line
# do not concat the segment to the last line of new chunk
# instead, yield the segment first
if buffer[-1] == '\n':
#print 'buffer ends with newline'
yield segment
else:
lines[-1] += segment
#print 'enlarged last line to >{}<, len {}'.format(lines[-1], len(lines))
segment = lines[0]
for index in range(len(lines) - 1, 0, -1):
if len(lines[index]):
yield lines[index]
# Don't yield None if the file was empty
if segment is not None:
yield segment
我只是在写这个答案的时候看到了 Murat Yükselen 的答案。几乎是一样的,我想这是件好事。下面的示例还处理 r 并在每个步骤中增加其缓冲区大小。我还有一些 单元测试来备份这个代码。
def readlines_reversed(f):
""" Iterate over the lines in a file in reverse. The file must be
open in 'rb' mode. Yields the lines unencoded (as bytes), including the
newline character. Produces the same result as readlines, but reversed.
If this is used to reverse the line in a file twice, the result is
exactly the same.
"""
head = b""
f.seek(0, 2)
t = f.tell()
buffersize, maxbuffersize = 64, 4096
while True:
if t <= 0:
break
# Read next block
buffersize = min(buffersize * 2, maxbuffersize)
tprev = t
t = max(0, t - buffersize)
f.seek(t)
lines = f.read(tprev - t).splitlines(True)
# Align to line breaks
if not lines[-1].endswith((b"\n", b"\r")):
lines[-1] += head # current tail is previous head
elif head == b"\n" and lines[-1].endswith(b"\r"):
lines[-1] += head # Keep \r\n together
elif head:
lines.append(head)
head = lines.pop(0) # can be '\n' (ok)
# Iterate over current block in reverse
for line in reversed(lines):
yield line
if head:
yield head
def ceil_division(left_number, right_number):
"""
Divides given numbers with ceiling.
"""
return -(-left_number // right_number)
split用于通过给定的分隔符从右端拆分字符串,能够保持:
def split(string, separator, keep_separator):
"""
Splits given string by given separator.
"""
parts = string.split(separator)
if keep_separator:
*parts, last_part = parts
parts = [part + separator for part in parts]
if last_part:
return parts + [last_part]
return parts
从二进制流的右端读取批处理
def read_batch_from_end(byte_stream, size, end_position):
"""
Reads batch from the end of given byte stream.
"""
if end_position > size:
offset = end_position - size
else:
offset = 0
size = end_position
byte_stream.seek(offset)
return byte_stream.read(size)
之后,我们可以定义函数来读取字节流的反向顺序,如
import functools
import itertools
import os
from operator import methodcaller, sub
def reverse_binary_stream(byte_stream, batch_size=None,
lines_separator=None,
keep_lines_separator=True):
if lines_separator is None:
lines_separator = (b'\r', b'\n', b'\r\n')
lines_splitter = methodcaller(str.splitlines.__name__,
keep_lines_separator)
else:
lines_splitter = functools.partial(split,
separator=lines_separator,
keep_separator=keep_lines_separator)
stream_size = byte_stream.seek(0, os.SEEK_END)
if batch_size is None:
batch_size = stream_size or 1
batches_count = ceil_division(stream_size, batch_size)
remaining_bytes_indicator = itertools.islice(
itertools.accumulate(itertools.chain([stream_size],
itertools.repeat(batch_size)),
sub),
batches_count)
try:
remaining_bytes_count = next(remaining_bytes_indicator)
except StopIteration:
return
def read_batch(position):
result = read_batch_from_end(byte_stream,
size=batch_size,
end_position=position)
while result.startswith(lines_separator):
try:
position = next(remaining_bytes_indicator)
except StopIteration:
break
result = (read_batch_from_end(byte_stream,
size=batch_size,
end_position=position)
+ result)
return result
batch = read_batch(remaining_bytes_count)
segment, *lines = lines_splitter(batch)
yield from lines[::-1]
for remaining_bytes_count in remaining_bytes_indicator:
batch = read_batch(remaining_bytes_count)
lines = lines_splitter(batch)
if batch.endswith(lines_separator):
yield segment
else:
lines[-1] += segment
segment, *lines = lines
yield from lines[::-1]
yield segment
最后,可以定义一个反转文本文件的函数,如下所示:
import codecs
def reverse_file(file, batch_size=None,
lines_separator=None,
keep_lines_separator=True):
encoding = file.encoding
if lines_separator is not None:
lines_separator = lines_separator.encode(encoding)
yield from map(functools.partial(codecs.decode,
encoding=encoding),
reverse_binary_stream(
file.buffer,
batch_size=batch_size,
lines_separator=lines_separator,
keep_lines_separator=keep_lines_separator))
from timeit import Timer
repeats_count = 7
number = 1
create_setup = ('from collections import deque\n'
'from __main__ import reverse_file, reverse_readline\n'
'file = open("{}")').format
srohde_solution = ('with file:\n'
' deque(reverse_readline(file,\n'
' buf_size=8192),'
' maxlen=0)')
azat_ibrakov_solution = ('with file:\n'
' deque(reverse_file(file,\n'
' lines_separator="\\n",\n'
' keep_lines_separator=False,\n'
' batch_size=8192), maxlen=0)')
print('reversing empty file by "srohde"',
min(Timer(srohde_solution,
create_setup('empty.txt')).repeat(repeats_count, number)))
print('reversing empty file by "Azat Ibrakov"',
min(Timer(azat_ibrakov_solution,
create_setup('empty.txt')).repeat(repeats_count, number)))
print('reversing tiny file (1MB) by "srohde"',
min(Timer(srohde_solution,
create_setup('tiny.txt')).repeat(repeats_count, number)))
print('reversing tiny file (1MB) by "Azat Ibrakov"',
min(Timer(azat_ibrakov_solution,
create_setup('tiny.txt')).repeat(repeats_count, number)))
print('reversing small file (10MB) by "srohde"',
min(Timer(srohde_solution,
create_setup('small.txt')).repeat(repeats_count, number)))
print('reversing small file (10MB) by "Azat Ibrakov"',
min(Timer(azat_ibrakov_solution,
create_setup('small.txt')).repeat(repeats_count, number)))
print('reversing large file (50MB) by "srohde"',
min(Timer(srohde_solution,
create_setup('large.txt')).repeat(repeats_count, number)))
print('reversing large file (50MB) by "Azat Ibrakov"',
min(Timer(azat_ibrakov_solution,
create_setup('large.txt')).repeat(repeats_count, number)))
注意 : 我使用 collections.deque类来排气发生器。
输出
对于 Windows 10上的 PyPy 3.5:
reversing empty file by "srohde" 8.31e-05
reversing empty file by "Azat Ibrakov" 0.00016090000000000028
reversing tiny file (1MB) by "srohde" 0.160081
reversing tiny file (1MB) by "Azat Ibrakov" 0.09594989999999998
reversing small file (10MB) by "srohde" 8.8891863
reversing small file (10MB) by "Azat Ibrakov" 5.323388100000001
reversing large file (50MB) by "srohde" 186.5338368
reversing large file (50MB) by "Azat Ibrakov" 99.07450229999998
对于 Windows 10上的 CPython 3.5:
reversing empty file by "srohde" 3.600000000000001e-05
reversing empty file by "Azat Ibrakov" 4.519999999999958e-05
reversing tiny file (1MB) by "srohde" 0.01965560000000001
reversing tiny file (1MB) by "Azat Ibrakov" 0.019207699999999994
reversing small file (10MB) by "srohde" 3.1341862999999996
reversing small file (10MB) by "Azat Ibrakov" 3.0872588000000007
reversing large file (50MB) by "srohde" 82.01206720000002
reversing large file (50MB) by "Azat Ibrakov" 82.16775059999998
import io
from lz.reversal import reverse
...
with open('path/to/file') as file:
for line in reverse(file, batch_size=io.DEFAULT_BUFFER_SIZE):
print(line)
from collections import deque
fs = open("test.txt","rU")
fr = deque(fs)
fr.reverse() # reverse in-place, returns None
for li in fr:
print li
fs.close()
from __future__ import annotations
from io import StringIO, SEEK_END
from pathlib import Path
from typing import Iterator, TextIO
def grep_backwards(
fh: TextIO,
match_substr: str,
line_ending: str = "\n",
strip_eol: bool = False,
step: int = 10,
) -> Iterator[str]:
"""
Helper for scanning a file line by line from the end, imitating the behaviour of
the Unix command line tools ``grep`` (when passed ``match_substr``) or ``tac`` (when
``match_substr`` is the empty string ``""``, i.e. matching all lines).
Args:
fh : The file handle to read from
match_substr : Substring to match at. If given as the empty string, gives a
reverse line iterator rather than a reverse matching line iterator.
line_ending : The line ending to split lines on (default: "\n" newline)
strip_eol : Whether to strip (default: ``True``) or keep (``False``) line
endings off the end of the strings returned by the iterator.
step : Number of characters to load into chunk buffer (i.e. chunk size)
"""
# Store the end of file (EOF) position as we are advancing backwards from there
file_end_pos = fh.seek(0, SEEK_END) # cursor has moved to EOF
# Keep a reversed string line buffer as we are writing right-to-left
revlinebuf = StringIO()
# Keep a [left-to-right] string buffer as we read left-to-right, one chunk at a time
chunk_buf = StringIO()
# Initialise 'last chunk start' at position after the EOF (unreachable by ``read``)
last_chunk_start = file_end_pos + 1
line_offset = 0 # relative to SEEK_END
has_EOF_newline = False # may change upon finding first newline
# In the worst case, seek all the way back to the start (position 0)
while last_chunk_start > 0:
# Ensure that read(size=step) will read at least 1 character
# e.g. when step=4, last_chunk_start=3, reduce step to 3 --> chunk=[0,1,2]
if step > last_chunk_start:
step = last_chunk_start
chunk_start = last_chunk_start - step
fh.seek(chunk_start)
# Read in the chunk for the current step (possibly after pre-existing chunks)
chunk_buf.write(fh.read(step))
while chunk := chunk_buf.getvalue():
# Keep reading intra-chunk lines RTL, leaving any leftovers in revlinebuf
lhs, EOL_match, rhs = chunk.rpartition(line_ending)
if EOL_match:
if line_offset == 0:
has_EOF_newline = rhs == ""
# Reverse the right-hand-side of the rightmost line_ending and
# insert it after anything already in the reversed line buffer
if rhs:
# Only bother writing rhs to line buffer if there's anything in it
revlinebuf.write(rhs[::-1])
# Un-reverse the line buffer --> full line after the line_ending match
completed_line = revlinebuf.getvalue()[::-1] # (may be empty string)
# Clear the reversed line buffer
revlinebuf.seek(0)
revlinebuf.truncate()
# `grep` if line matches (or behaves like `tac` if match_substr == "")
if line_offset == 0:
if not has_EOF_newline and match_substr in completed_line:
# The 0'th line from the end (by definition) cannot get an EOL
yield completed_line
elif match_substr in (completed_line + line_ending):
if not strip_eol:
completed_line += line_ending
yield completed_line
line_offset += 1
else:
# If line_ending not found in chunk then add entire [remaining] chunk,
# in reverse, onto the reversed line buffer, before chunk_buf is cleared
revlinebuf.write(chunk_buf.getvalue()[::-1])
# The LHS of the rightmost line_ending (if any) may contain another line
# ending so truncate the chunk to that and re-iterate (else clear chunk_buf)
chunk_buf.seek(len(lhs))
chunk_buf.truncate()
last_chunk_start = chunk_start
if completed_line := revlinebuf.getvalue()[::-1]:
# Iteration has reached the line at start of file, left over in the line buffer
if line_offset == 0 and not has_EOF_newline and match_substr in completed_line:
# The 0'th line from the end (by definition) cannot get an EOL
yield completed_line
elif match_substr in (
completed_line + (line_ending if line_offset > 1 or has_EOF_newline else "")
):
if line_offset == 1:
if has_EOF_newline and not strip_eol:
completed_line += line_ending
elif not strip_eol:
completed_line += line_ending
yield completed_line
else:
raise StopIteration
这里有一些测试来证明它的工作原理,3个测试输入文件通过计数到100说‘ Hi 0’,‘ Hi 9’,‘ Hi 18’,... :
给27号换双行
文件结尾不要换行
... 并给文件2的结尾换行
# Write lines counting to 100 saying 'Hi 0', 'Hi 9', ... give number 27 a double newline
str_out = "".join([f"Hi {i}\n" if i != 27 else f"Hi {i}\n\n" for i in range(0, 100, 9)])
example_file = Path("example.txt")
no_eof_nl_file = Path("no_eof_nl.txt") # no end of file newline
double_eof_nl_file = Path("double_eof_nl.txt") # double end of file newline
with open(example_file, "w") as f_out:
f_out.write(str_out)
with open(no_eof_nl_file, "w") as f_out:
f_out.write(str_out.rstrip("\n"))
with open(double_eof_nl_file, "w") as f_out:
f_out.write(str_out + "\n")
file_list = [example_file, no_eof_nl_file, double_eof_nl_file]
labels = [
"EOF_NL ",
"NO_EOF_NL ",
"DBL_EOF_NL",
]
print("------------------------------------------------------------")
print()
print(f"match_substr = ''")
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
lines_rev_from_iterator = list(grep_backwards(fh=fh, match_substr=""))
with open(each_file, "r") as fh:
lines_rev_from_readline = list(reversed(fh.readlines()))
print(label, f"{lines_rev_from_iterator == lines_rev_from_readline=}")
print()
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
reverse_iterator = grep_backwards(fh=fh, match_substr="")
first_match = next(reverse_iterator)
print(label, f"{first_match=}")
print()
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
all_matches = list(grep_backwards(fh=fh, match_substr=""))
print(label, f"{all_matches=}")
print()
print()
print("------------------------------------------------------------")
print()
print(f"match_substr = 'Hi 9'")
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
reverse_iterator = grep_backwards(fh=fh, match_substr="Hi 9")
first_match = next(reverse_iterator)
print(label, f"{first_match=}")
print()
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
all_matches = list(grep_backwards(fh=fh, match_substr="Hi 9"))
print(label, f"{all_matches=}")
print()
print("------------------------------------------------------------")
print()
print(f"match_substr = '\\n'")
for len_flag in (True, False):
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
lines_rev_from_iterator = list(grep_backwards(fh=fh, match_substr="\n"))
if len_flag:
print(label, f"{len(lines_rev_from_iterator)=}")
else:
print(label, f"{lines_rev_from_iterator=}")
print()
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
reverse_iterator = grep_backwards(fh=fh, match_substr="\n")
first_match = next(reverse_iterator)
print(label, f"{first_match=}")
print()
for label, each_file in zip(labels, file_list):
with open(each_file, "r") as fh:
all_matches = list(grep_backwards(fh=fh, match_substr="\n"))
print(label, f"{all_matches=}")
print()
print("------------------------------------------------------------")