在进程运行时不断打印子进程输出

要从我的python脚本启动程序,我使用以下方法:

def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = process.communicate()[0]
exitCode = process.returncode


if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)

因此,当我启动像Process.execute("mvn clean install")这样的进程时,我的程序会一直等待,直到进程完成,只有在那时我才能得到程序的完整输出。这是恼人的,如果我正在运行一个进程,需要一段时间才能完成。

我能让我的程序一行一行地写进程输出吗,在循环结束之前轮询进程输出什么的?

我发现文章可能是相关的。

298667 次浏览

只要命令输出了行,就可以使用iter来处理它们:下面是一个完整的例子,展示了一个典型的用例(感谢@jfs的帮助):

from __future__ import print_function # Only Python 2.x
import subprocess


def execute(cmd):
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
for stdout_line in iter(popen.stdout.readline, ""):
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, cmd)


# Example
for path in execute(["locate", "a"]):
print(path, end="")

好吧,我设法解决它没有线程(任何建议为什么使用线程会更好,感谢)使用这个问题在子进程运行时拦截子进程的标准输出的一个片段

def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)


# Poll process for new output until finished
while True:
nextline = process.stdout.readline()
if nextline == '' and process.poll() is not None:
break
sys.stdout.write(nextline)
sys.stdout.flush()


output = process.communicate()[0]
exitCode = process.returncode


if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)

对于试图回答这个问题并从Python脚本中获取标准输出的人来说,请注意Python会缓冲它的标准输出,因此可能需要一段时间才能看到标准输出。

这可以通过在目标脚本中的每个标准输出写入后添加以下内容来纠正:

sys.stdout.flush()

@tokland

尝试了您的代码,并纠正了3.4和Windows dir。CMD是一个简单的dir命令,保存为CMD -file

import subprocess
c = "dir.cmd"


def execute(command):
popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
lines_iterator = iter(popen.stdout.readline, b"")
while popen.poll() is None:
for line in lines_iterator:
nline = line.rstrip()
print(nline.decode("latin"), end = "\r\n",flush =True) # yield line


execute(c)

在Python 3中,只要子进程的stdout缓冲区被刷新,就逐行打印子进程的输出:

from subprocess import Popen, PIPE, CalledProcessError


with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
print(line, end='') # process line here


if p.returncode != 0:
raise CalledProcessError(p.returncode, p.args)

注意:你不需要p.poll()——当到达eof时循环结束。你不需要iter(p.stdout.readline, '')——预读错误在Python 3中已经修复。

另见Python:从subprocess. communication()读取流输入

如果有人想同时使用线程从stdoutstderr中读取,这是我想到的:

import threading
import subprocess
import Queue


class AsyncLineReader(threading.Thread):
def __init__(self, fd, outputQueue):
threading.Thread.__init__(self)


assert isinstance(outputQueue, Queue.Queue)
assert callable(fd.readline)


self.fd = fd
self.outputQueue = outputQueue


def run(self):
map(self.outputQueue.put, iter(self.fd.readline, ''))


def eof(self):
return not self.is_alive() and self.outputQueue.empty()


@classmethod
def getForFd(cls, fd, start=True):
queue = Queue.Queue()
reader = cls(fd, queue)


if start:
reader.start()


return reader, queue




process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)


# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
# Process all available lines from the stdout Queue.
while not stdoutQueue.empty():
line = stdoutQueue.get()
print 'Received stdout: ' + repr(line)


# Do stuff with stdout line.


# Process all available lines from the stderr Queue.
while not stderrQueue.empty():
line = stderrQueue.get()
print 'Received stderr: ' + repr(line)


# Do stuff with stderr line.


# Sleep for a short time to avoid excessive CPU use while waiting for data.
sleep(0.05)


print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()


# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()


print "Waiting for process to exit..."
returnCode = process.wait()


if returnCode != 0:
raise subprocess.CalledProcessError(returnCode, command)

我只是想分享这个,因为我在这个问题上试图做类似的事情,但没有一个答案能解决我的问题。希望它能帮助到某些人!

请注意,在我的用例中,外部进程杀死了我们Popen()的进程。

这个PoC不断地从进程中读取输出,并可以在需要时访问。只有最后一个结果被保留,所有其他输出都被丢弃,因此防止了PIPE的内存不足:

import subprocess
import time
import threading
import Queue




class FlushPipe(object):
def __init__(self):
self.command = ['python', './print_date.py']
self.process = None
self.process_output = Queue.LifoQueue(0)
self.capture_output = threading.Thread(target=self.output_reader)


def output_reader(self):
for line in iter(self.process.stdout.readline, b''):
self.process_output.put_nowait(line)


def start_process(self):
self.process = subprocess.Popen(self.command,
stdout=subprocess.PIPE)
self.capture_output.start()


def get_output_for_processing(self):
line = self.process_output.get()
print ">>>" + line




if __name__ == "__main__":
flush_pipe = FlushPipe()
flush_pipe.start_process()


now = time.time()
while time.time() - now < 10:
flush_pipe.get_output_for_processing()
time.sleep(2.5)


flush_pipe.capture_output.join(timeout=0.001)
flush_pipe.process.kill()

print_date.py

#!/usr/bin/env python
import time


if __name__ == "__main__":
while True:
print str(time.time())
time.sleep(0.01)

输出:你可以清楚地看到只有大约2.5s的输出间隔。

>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01

这至少在Python3.4中是有效的

import subprocess


process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
print(line.decode().strip())

在Python中>= 3.5使用subprocess.run为我工作:

import subprocess


cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)

(在没有shell=True的情况下,在执行期间获得输出也可以工作) https://docs.python.org/3/library/subprocess.html#subprocess.run < / p >

要回答最初的问题,IMO的最佳方法是直接将子进程stdout重定向到程序的stdout(可选地,对stderr也可以这样做,如下例所示)

p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()

在Python 3.6中,我使用了这个:

import subprocess


cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)

这里没有一个答案能满足我所有的需求。

  1. 没有用于标准输出的线程(也没有队列等)
  2. 非阻塞,因为我需要检查其他事情正在进行
  3. 使用PIPE,因为我需要做很多事情,例如流输出,写入日志文件,并返回输出的字符串副本。

一点背景知识:我使用ThreadPoolExecutor来管理一个线程池,每个线程启动一个子进程并并发地运行它们。(在Python2.7中,但这应该在更新的3中工作。X也是)。我不希望只使用线程来收集输出,因为我希望有尽可能多的可用线程用于其他事情(一个20个进程的池将使用40个线程来运行;1用于进程线程,1用于stdout…如果你想要stderr我猜)

我在这里剥离了很多异常等,所以这是在生产中工作的代码上的基于。希望我没有在复制粘贴过程中破坏它。同时,非常欢迎反馈!

import time
import fcntl
import subprocess
import time


proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)


# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)


def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
"""A little inline function to handle the stdout business. """
# fcntl makes readline non-blocking so it raises an IOError when empty
try:
for s in iter(proc_stream.readline, ''):   # replace '' with b'' for Python 3
my_buffer.append(s)


if echo_streams:
sys.stdout.write(s)


if log_file:
log_file.write(s)
except IOError:
pass


# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
handle_stdout(proc_stdout, stdout_parts)


# ...Check for other things here...
# For example, check a multiprocessor.Value('b') to proc.kill()


time.sleep(0.01)


# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)


stdout_str = "".join(stdout_parts)  # Just to demo

我确信这里有额外的开销,但在我的情况下这不是一个问题。从功能上讲,它满足了我的需要。我唯一没有解决的问题是,为什么这对于日志消息来说是完美的,但我看到一些print消息在稍后和同时显示。

当你只想print输出时,实际上有一个非常简单的方法来做到这一点:

import subprocess
import sys


def execute(command):
subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)

这里我们只是简单地将子进程指向我们自己的stdout,并使用现有的成功或异常api。

简单比复杂好。

os库有内置模块system。您应该执行代码并看到输出。

import os
os.system("python --version")
# Output
"""
Python 3.8.6
0
"""

在version之后,它也被打印返回值为0

import time
import sys
import subprocess
import threading
import queue


cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'


class ExecutorFlushSTDOUT(object):
def __init__(self,timeout=15):
self.process = None
self.process_output = queue.Queue(0)
self.capture_output = threading.Thread(target=self.output_reader)
self.timeout=timeout
self.result=False
self.validator=None
        

def output_reader(self):
start=time.time()
while self.process.poll() is None and (time.time() - start) < self.timeout:
try:
if not self.process_output.full():
line=self.process.stdout.readline()
if line:
line=line.decode().rstrip("\n")
start=time.time()
self.process_output.put(line)
if self.validator:
if self.validator in line: print("Valid");self.result=True


except:pass
self.process.kill()
return
            

def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
if timeout: self.timeout=timeout
self.validator=validator
self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
self.capture_output.start()
line=None
self.result=False
while self.process.poll() is None:
try:
if not self.process_output.empty():
line = self.process_output.get()
if line:
if callback:callback(line)
#print(line)
line=None
except:pass
error = self.process.returncode
if error:
print("Error Found",str(error))
raise RuntimeError(error)
return self.result


execute = ExecutorFlushSTDOUT()


def liveOUTPUT(line):
print("liveOUTPUT",line)
try:
if "Writing" in line:
line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
print("percent={}".format(line))
except Exception as e:
pass
    





result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")


print("Finish",result)


在@jfs的优秀的答案的基础上,这里有一个完整的工作示例供你使用。要求Python 3.7或更新版本。

sub.py

import time


for i in range(10):
print(i, flush=True)
time.sleep(1)

main.py

from subprocess import PIPE, Popen
import sys


with Popen([sys.executable, 'sub.py'], bufsize=1, stdout=PIPE, text=True) as sub:
for line in sub.stdout:
print(line, end='')

注意子脚本中使用的flush参数。

如果你想在进程运行时从标准输出输出,使用-u Python选项和subprocess.Popen()。(shell=True是必要的,尽管有风险……)