使用子进程获取实时输出

我正在尝试编写一个包装脚本,用于命令行程序(svnadmin verify),该程序作为进度指示器用来友好的显示具体操作进度。这需要我能够看到封装程序输出的每一行内容,并立即显示出来。

我想我只需要使用 subprocess.Popen执行程序,使用 stdout=PIPE,然后读入的每一行并相应地执行它。但是,当我运行以下代码时,输出似乎在某个地方被缓冲了,导致它出现在两个块中,第1行到332行,然后是333行到439行(输出的最后一行)

from subprocess import Popen, PIPE, STDOUT


p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE,
stderr = STDOUT, shell = True)
for line in p.stdout:
print line.replace('\n', '')

在查看子进程文档后,我发现 Popenbufsize 参数,因此我尝试将 bufsize 设置为1(每行缓冲) 和 0(无缓冲),但这两个值似乎都没有改变行被传递的方式。。

在这一点上,我开始拼命地寻找出路,所以我编写了下面的输出循环。:

while True:
try:
print p.stdout.next().replace('\n', '')
except StopIteration:
break

却得到了同样的结果。

是否有可能获得使用子进程执行的程序的“实时”程序输出?Python 中还有其他向前兼容的选项(不是 exec*)吗?

161097 次浏览

你可以试试这个:

import subprocess
import sys


process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)


while True:
out = process.stdout.read(1)
if out == '' and process.poll() != None:
break
if out != '':
sys.stdout.write(out)
sys.stdout.flush()

如果使用 readline 而不是 read,则在某些情况下将不打印输入消息。用一个需要内联输入的命令尝试一下,然后自己看看。

不久前我也遇到了同样的问题。我的解决方案是放弃对 read方法的迭代,即使您的子进程没有完成执行,它也会立即返回。

我尝试了这个,由于某种原因,一下代码

for line in p.stdout:
...

大量缓存,下面相似的写法却不会

while True:
line = p.stdout.readline()
if not line: break
...

显然,这是一个已知的错误: http://bugs.python.org/issue3907(该问题现在已经“关闭”,截至2018年8月29日)

使用带有非阻塞读行的 预计可以解决这个问题。它源于这样一个事实,即管道是缓冲的,因此应用程序的输出是由管道来缓冲的,因此,在缓冲区填满或进程终止之前,您无法获得该输出。

通过将缓冲区大小设置为1,实际上可以强制进程不对输出进行缓冲。

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
print line,
p.stdout.close()
p.wait()

完整的解决方案:

import contextlib
import subprocess


# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
stream = getattr(proc, stream)
with contextlib.closing(stream):
while True:
out = []
last = stream.read(1)
# Don't loop forever
if last == '' and proc.poll() is not None:
break
while last not in newlines:
# Don't loop forever
if last == '' and proc.poll() is not None:
break
out.append(last)
last = stream.read(1)
out = ''.join(out)
yield out


def example():
cmd = ['ls', '-l', '/']
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
# Make all end-of-lines '\n'
universal_newlines=True,
)
for line in unbuffered(proc):
print line


example()

我使用这个解决方案来获得子进程的实时输出。这个循环将在进程完成时立即停止,而不需要中断语句或可能的无限循环。

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)


while sub_process.poll() is None:
out = sub_process.stdout.read(1)
sys.stdout.write(out)
sys.stdout.flush()

实时输出问题解决: 我在 Python 中遇到了类似的问题,同时捕获了 C 程序的实时输出。我在 C 代码中加入了 fflush(stdout);。这招对我很管用。这是密码。

C 程序:

#include <stdio.h>
void main()
{
int count = 1;
while (1)
{
printf(" Count  %d\n", count++);
fflush(stdout);
sleep(1);
}
}

Python 程序:

#!/usr/bin/python


import os, sys
import subprocess




procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)


while procExe.poll() is None:
line = procExe.stdout.readline()
print("Print:" + line)

产出:

Print: Count  1
Print: Count  2
Print: Count  3

找到了这个“即插即用”的功能 给你。工作像一个魅力!

import subprocess


def myrun(cmd):
"""from
http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print line,
if line == '' and p.poll() != None:
break
return ''.join(stdout)

您可以在子流程的输出中对每个字节使用迭代器。这允许从子进程进行内联更新(以’r’结尾的行覆盖以前的输出行) :

from subprocess import PIPE, Popen


command = ["my_command", "-my_arg"]


# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)




# read each byte of subprocess
while subprocess.poll() is None:
for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
c = c.decode('ascii')
sys.stdout.write(c)
sys.stdout.flush()


if subprocess.returncode != 0:
raise Exception("The subprocess did not terminate correctly.")

这是我经常用到的基本框架。它使实现超时变得容易,并且能够处理不可避免的挂起过程。

import subprocess
import threading
import Queue


def t_read_stdout(process, queue):
"""Read from stdout"""


for output in iter(process.stdout.readline, b''):
queue.put(output)


return


process = subprocess.Popen(['dir'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
cwd='C:\\',
shell=True)


queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()


while process.poll() is None or not queue.empty():
try:
output = queue.get(timeout=.5)


except Queue.Empty:
continue


if not output:
continue


print(output),


t_stdout.join()

您可以将子进程输出直接指向流:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)

根据用例的不同,您可能还需要禁用子流程本身中的缓冲。

如果子进程将是一个 Python 进程,那么您可以在调用之前完成:

os.environ["PYTHONUNBUFFERED"] = "1"

或者在 env参数中将其传递给 Popen

否则,如果你在 Linux/Unix 上,你可以使用 stdbuf工具,例如:

cmd = ["stdbuf", "-oL"] + cmd

关于 stdbuf或其他选项,请参阅 给你

(同样的答案请参阅 给你。)

凯文 · 麦卡锡在 Python 中使用异步对 stdin 和 stdout 进行流处理博客文章展示了如何使用异步实现:

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec




async def _read_stream(stream, callback):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break




async def run(command):
process = await create_subprocess_exec(
*command, stdout=PIPE, stderr=PIPE
)


await asyncio.wait(
[
_read_stream(
process.stdout,
lambda x: print(
"STDOUT: {}".format(x.decode("UTF8"))
),
),
_read_stream(
process.stderr,
lambda x: print(
"STDERR: {}".format(x.decode("UTF8"))
),
),
]
)


await process.wait()




async def main():
await run("docker build -t my-docker-image:latest .")




if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

(该解决方案已经在 Python 2.7.15中进行了测试)
只需要在每行读/写之后使用 sys.stdout.rush () :

while proc.poll() is None:
line = proc.stdout.readline()
sys.stdout.write(line)
# or print(line.strip()), you still need to force the flush.
sys.stdout.flush()

在 Python 3.x 中,进程可能挂起,因为输出是字节数组而不是字符串。确保将其解码为字符串。

从 Python 3.6开始,您可以使用 打开构造函数中的参数 encoding来完成这项工作:

process = subprocess.Popen(
'my_command',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
encoding='utf-8',
errors='replace'
)


while True:
realtime_output = process.stdout.readline()


if realtime_output == '' and process.poll() is not None:
break


if realtime_output:
print(realtime_output.strip(), flush=True)

请注意,此代码 重定向stderrstdout处理输出错误

很少有答案建议使用 python 3.x 或者 pthon 2.x,下面的代码对两者都适用。

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
stdout = []
while True:
line = p.stdout.readline()
if not isinstance(line, (str)):
line = line.decode('utf-8')
stdout.append(line)
print (line)
if (line == '' and p.poll() != None):
break

如果您只想将日志实时转发到控制台

下面的代码对两者都适用

 p = subprocess.Popen(cmd,
shell=True,
cwd=work_dir,
bufsize=1,
stdin=subprocess.PIPE,
stderr=sys.stderr,
stdout=sys.stdout)
def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
rc = process.poll()
return rc

以下是对我有效的方法:

import subprocess
import sys


def run_cmd_print_output_to_console_and_log_to_file(cmd, log_file_path):
make_file_if_not_exist(log_file_path)
logfile = open(log_file_path, 'w')


proc=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell = True)
for line in proc.stdout:
sys.stdout.write(line.decode("utf-8") )
print(line.decode("utf-8").strip(), file=logfile, flush=True)
proc.wait()


logfile.close()

又一个答案! 我有以下要求:

  • 运行一些命令并将输出打印到 stdout,就像用户运行它一样
  • 向用户显示来自命令的任何提示。例如,pip uninstall numpy将用 ... Proceed (Y/n)?提示(它不以换行结束)
  • 将输出(用户看到的)捕获为字符串

这对我来说很有用(只在 Python 3.10的 Windows 环境下测试过) :

def run(*args: list[str]) -> str:
proc = subprocess.Popen(
*args,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)


result = ""


while proc.poll() is None:
output = proc.stdout.read(1)


if output:
sys.stdout.write(output)
sys.stdout.flush()
result += output


return result