逐行读取子进程标准输出

我的python脚本使用subprocess调用一个非常吵闹的linux实用程序。我想将所有输出存储到一个日志文件中,并将其中一些显示给用户。我认为下面的代码可以工作,但是直到实用程序产生大量输出,输出才显示在我的应用程序中。

#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
print hex(i)*512
i += 1
time.sleep(0.5)


#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
#the real code does filtering here
print "test:", line.rstrip()

我真正想要的行为是让过滤器脚本打印从子进程接收到的每一行。有点像tee做的,但用的是python代码。

我错过了什么?这可能吗?


更新:

如果将sys.stdout.flush()添加到fake_utility.py中,代码在python 3.1中具有所需的行为。我使用的是python 2.6。你可能会认为使用proc.stdout.xreadlines()可以和py3k一样工作,但事实并非如此。


更新2:

下面是最小的工作代码。

#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
print i
sys.stdout.flush()
time.sleep(0.5)


#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
print line.rstrip()
485896 次浏览

我认为问题出在语句for line in proc.stdout上,它在遍历整个输入之前读取整个输入。解决方案是使用readline()代替:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
line = proc.stdout.readline()
if not line:
break
#the real code does filtering here
print "test:", line.rstrip()

当然,您仍然需要处理子进程的“缓冲”。

注意:带有迭代器的根据文档解决方案应该等效于使用readline(),除了预读缓冲区,但(或正因为如此)提议的更改确实为我产生了不同的结果(Windows XP上的Python 2.5)。

实际上,如果您对迭代器进行了排序,那么缓冲现在可能是您的问题。你可以告诉子进程中的python不要缓冲它的输出。

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

就变成了

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

当从python内部调用python时,我需要这个。

你想把这些额外的参数传递给subprocess.Popen:

bufsize=1, universal_newlines=True

然后可以像示例中那样进行迭代。(使用Python 3.5测试)

有点晚了,但很惊讶没有看到我认为最简单的解决方案:

import io
import subprocess


proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
# do something with line

(这需要Python 3。)

以下对Rômulo的回答的修改适用于Python 2和3(2.7.12和3.6.1):

import os
import subprocess


process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
line = process.stdout.readline()
if line != '':
os.write(1, line)
else:
break

我尝试用python3和它工作,

当你使用popen来生成新线程时,你告诉操作系统PIPE子进程的stdout,这样父进程就可以读取它,在这里,stderr被复制到父进程的stderr

output_reader中,我们读取子进程的stdout的每一行,方法是将它包装在iterator中,每当有新行准备好时,该iterator就会逐行填充子进程的输出。

def output_reader(proc):
for line in iter(proc.stdout.readline, b''):
print('got line: {0}'.format(line.decode('utf-8')), end='')




def main():
proc = subprocess.Popen(['python', 'fake_utility.py'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)


t = threading.Thread(target=output_reader, args=(proc,))
t.start()


try:
time.sleep(0.2)
import time
i = 0
    

while True:
print (hex(i)*512)
i += 1
time.sleep(0.5)
finally:
proc.terminate()
try:
proc.wait(timeout=0.2)
print('== subprocess exited with rc =', proc.returncode)
except subprocess.TimeoutExpired:
print('subprocess did not terminate in time')
t.join()

你也可以读行w/o循环。适用于python3.6。

import os
import subprocess


process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

一个允许同时对stdoutstderr进行逐行实时迭代的函数

如果你需要同时获取stdoutstderr的输出流,你可以使用下面的函数。

该函数使用Queues将两个Popen管道合并到一个迭代器中。

这里我们创建了函数read_popen_pipes():

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor




def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()




def read_popen_pipes(p):


with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()


pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)


while True:


if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break


out_line = err_line = ''


try:
out_line = q_stdout.get_nowait()
except Empty:
pass
try:
err_line = q_stderr.get_nowait()
except Empty:
pass


yield (out_line, err_line)

read_popen_pipes()正在使用:

import subprocess as sp




with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:


for out_line, err_line in read_popen_pipes(p):


# Do stuff with each line, e.g.:
print(out_line, end='')
print(err_line, end='')


return p.poll() # return status-code

python 3.5将方法run()call()添加到subprocess模块中,两者都返回一个CompletedProcess对象。这样你就可以使用proc.stdout.splitlines():

proc = subprocess.run( comman, shell=True, capture_output=True, text=True, check=True )
for line in proc.stdout.splitlines():
print "stdout:", line

另见如何在Python中使用子进程运行方法执行Shell命令

我有一个问题与Popen的参数列表更新服务器,下面的代码解决了这一点。

import getpass
from subprocess import Popen, PIPE


username = 'user1'
ip = '127.0.0.1'


print ('What is the password?')
password = getpass.getpass()
cmd1 = f"""sshpass -p {password} ssh {username}@{ip}"""
cmd2 = f"""echo {password} | sudo -S apt update"""
cmd3 = " && "
cmd4 = f"""echo {password} | sudo -S apt upgrade -y"""
cmd5 = " && "
cmd6 = "exit"
commands = [cmd1, cmd2, cmd3, cmd4, cmd5, cmd6]


command = " ".join(commands)


cmd = command.split()


with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
print(line, end='')

要在本地计算机上运行更新,下面的代码示例将执行此操作。

import getpass
from subprocess import Popen, PIPE


print ('What is the password?')
password = getpass.getpass()


cmd1_local = f"""apt update"""
cmd2_local = f"""apt upgrade -y"""
commands = [cmd1_local, cmd2_local]


with Popen(['echo', password], stdout=PIPE) as auth:
for cmd in commands:
cmd = cmd.split()
with Popen(['sudo','-S'] + cmd, stdin=auth.stdout, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
print(line, end='')