子进程上的非阻塞读取。Python中的PIPE

我使用子流程模块启动子进程并连接到它的输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或检查在我调用.readline之前是否在流上有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

以下是我现在怎么做的(如果没有数据可用,它会阻塞.readline):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
290683 次浏览

一种解决方案是让另一个进程执行对该进程的读取,或者创建一个带有超时的进程线程。

这是一个超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,你是否需要在stdout进入时读取它? 另一种解决方案可能是将输出转储到一个文件中,然后使用p.wait ().

等待进程完成
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()




str = open('myprogram_output.txt','r').read()

选择模块帮助您确定下一个有用的输入在哪里。

但是,您几乎总是更喜欢使用单独的线程。一个是阻塞读取stdin,另一个是阻塞你不想阻塞的地方。

尝试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")


while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out

该模块负责S.Lott建议的所有线程。

我也经常遇到类似的问题;我经常编写的Python程序需要能够执行一些主要功能,同时接受来自命令行(stdin)的用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()阻塞并且没有超时。如果主要功能完成,不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能,因为readline()仍然阻塞在另一个线程中等待一行。我找到了一个解决这个问题的解决方案是使用fcntl模块使stdin成为一个非阻塞文件:

import fcntl
import os
import sys


# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)


# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)

在我看来,这比使用select或signal模块来解决这个问题要简洁一些,但它只适用于UNIX…

使用select &阅读(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))

readline()例如:

lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a

fcntlselectasyncproc在这种情况下没有帮助。

不管操作系统是什么,读取流而不阻塞的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread


try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty  # python 2.x


ON_POSIX = 'posix' in sys.builtin_module_names


def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()


p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()


# ... do other things here


# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
我添加这个问题是为了读取一些子进程。Popen stdout。 下面是我的非阻塞读解决方案:

import fcntl


def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""


# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()


# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

你可以在扭曲的中轻松做到这一点。根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道。您创建了一个ProcessProtocol类,并重写了outReceived()方法。Twisted(取决于所使用的反应器)通常只是一个大的select()循环,其中安装了回调来处理来自不同文件描述符(通常是网络套接字)的数据。因此,outReceived()方法只是安装一个回调来处理来自STDOUT的数据。演示这种行为的简单示例如下:

from twisted.internet import protocol, reactor


class MyProcessProtocol(protocol.ProcessProtocol):


def outReceived(self, data):
print data


proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

扭曲的文档在这方面有一些很好的信息。

如果您围绕Twisted构建整个应用程序,它可以与其他进程(本地或远程)进行异步通信,就像这样非常优雅。另一方面,如果您的程序不是构建在Twisted之上,那么这真的不会有多大帮助。希望这能对其他读者有所帮助,即使它不适用于您的特定应用程序。

免责声明:这只适用于龙卷风

您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现这一点。我把它打包在一个名为tornado_subprocess的鸡蛋中,你可以通过PyPI安装它:

easy_install tornado_subprocess

现在你可以这样做:

import tornado_subprocess
import tornado.ioloop


def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr


t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

你也可以将它与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()


@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()

现有的解决方案不适合我(详情见下文)。最后成功的是使用read(1)实现readline(基于这个答案)。后者不阻塞:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None


#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()


myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()


#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()

为什么现有的解决方案不起作用:

  1. 需要readline的解决方案(包括基于Queue的解决方案)总是阻塞的。很难(不可能?)杀死执行readline的线程。只有当创建它的进程结束时,它才会被终止,但产生输出的进程不会被终止。
  2. 混合低级的fcntl和高级的readline调用可能不能正常工作,anonnn已经指出。
  3. 使用select.poll()很简洁,但根据python文档,它在Windows上不起作用。
  4. 对于这项任务,使用第三方库似乎有些过度,并增加了额外的依赖关系。

我已经创建了一个基于j·f·塞巴斯蒂安的解决方案的库。你可以使用它。

https://github.com/cenkalti/what

编辑:这个实现仍然会阻塞。使用j.f.塞巴斯蒂安的回答代替。

我尝试了上面的回答,但是线程代码的额外风险和维护令人担忧。

通过查看输入输出模块(仅限于2.6),我找到了BufferedReader。这是我的无线程、无阻塞的解决方案。

import io
from subprocess import PIPE, Popen


p = Popen(['myprogram.exe'], stdout=PIPE)


SLEEP_DELAY = 0.001


# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line


# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line

下面是我的代码,用于捕获子进程ASAP的每个输出,包括部分行。它同时泵浦,stdout和stderr的顺序几乎是正确的。

在Python 2.7 linux &上测试并正确工作;窗户

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')


while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"


if __name__ == '__main__':
__main__()

根据J.F. Sebastian的答案和其他几个来源,我组合了一个简单的子流程管理器。它提供了请求的非阻塞读取,以及并行运行几个进程。它不使用任何特定于操作系统的调用(我知道),因此应该在任何地方工作。

它可以从pypi获得,所以只需pip install shelljob。参考项目页面获得示例和完整的文档。

Python 3.4为异步IO引入了新的临时API——# EYZ0模块

方法类似于@Bryan Ward基于# eyz0的回答——定义一个协议,一旦数据准备好就调用它的方法:

#!/usr/bin/env python3
import asyncio
import os


class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)


def connection_lost(self, exc):
loop.stop() # end loop.run_forever()


if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()

看到# EYZ0。

有一个高级接口asyncio.create_subprocess_exec()返回# EYZ1对象,允许使用StreamReader.readline()协程异步读取一行 (# EYZ2): < / p >

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing


async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)


# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit




if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()


with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))

readline_and_kill()执行以下任务:

  • 启动子进程,将其标准输出重定向到管道
  • 异步从子进程的stdout中读取一行
  • 杀子流程
  • 等待它退出

如果需要,每个步骤都可以被超时秒限制。

为什么要打扰线程队列? 与readline()不同,BufferedReader.read1()不会阻塞等待\r\n,如果有任何输出进入,它会尽快返回

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io


def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,


if __name__ == '__main__':
__main__()

在我的例子中,我需要一个日志模块来捕获后台应用程序的输出并对其进行扩充(添加时间戳、颜色等)。

我最终使用了一个后台线程来执行实际的I/O操作。以下代码仅适用于POSIX平台。我去掉了不重要的部分。

如果有人打算长期使用这个野兽,可以考虑管理开放描述符。对我来说,这不是什么大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess


class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()


def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e


def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)


class log:
logger = Logger()


def __init__(self, name):
self.__name = name
self.__piped = False


def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write


def __run(self, line):
self.chat(line, nl=False)


def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)


def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))


def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()


ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)


date = log('date')
date.chat('go')
system("date", output=date)

下面是一个在python中支持非阻塞读和后台写的模块:

https://pypi.python.org/pypi/python-nonblock

提供一个函数,

nonblock_read将从流中读取数据,如果可用,否则返回一个空字符串(或None,如果流在另一端关闭,并且所有可能的数据都已读取)

你也可以考虑python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。所以在subprocess返回的对象上。Popen”被添加了一个额外的方法,runInBackground。这将启动一个线程并返回一个对象,该对象将在写入stdout/stderr时自动填充,而不会阻塞主线程。

享受吧!

在这里添加这个答案,因为它提供了在Windows和Unix上设置非阻塞管道的能力。

所有的ctypes细节都要感谢@techtonik的回答

有一个稍微修改过的版本,可以在Unix和Windows系统上使用。

  • Python3兼容(只需要轻微更改)
  • 包括posix版本,并定义了用于这两个版本的异常。

这样你就可以在Unix和Windows代码中使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:


p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)


pipe_non_blocking_set(p.stdout.fileno())


try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""




__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)


import os




if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt


from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL


LPDWORD = POINTER(DWORD)


PIPE_NOWAIT = wintypes.DWORD(0x00000001)


def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL


h = msvcrt.get_osfhandle(pipefd)


res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True


return pipe_no_wait(fd)


def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232


return (GetLastError() == ERROR_NO_DATA)


PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True


def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True


PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的readline生成器(它为每一行返回字节字符串)。

它是一个发电机,所以你可以例如…

def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.


stdout_iter = iter(non_blocking_readlines(process.stdout))


line = next(stdout_iter)  # will be a line or b''.
"""
import os


from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)


fd = f.fileno()
pipe_non_blocking_set(fd)


blocks = []


while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex


yield b''
continue


while True:
n = data.find(b'\n')
if n == -1:
break


yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)

这个版本的非阻塞读取需要特殊的模块,并且可以在大多数Linux发行版上开箱即用。

import os
import sys
import time
import fcntl
import subprocess


def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass


def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)


if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())

这是一个在子进程中执行交互命令的例子,通过伪终端实现了stdout的交互。您可以参考:https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen


command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()


# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())


# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()


# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)


while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)


# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

我的问题有点不同,因为我想从正在运行的进程中收集stdout和stderr,但最终是一样的,因为我想在小部件生成时在小部件中呈现输出。

我不希望使用队列或额外的线程来解决许多建议的解决方案,因为执行运行另一个脚本并收集其输出这样的常见任务应该不需要它们。

在阅读了建议的解决方案和python文档后,我解决了以下实现的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

我同意,对于这样一个常见的脚本任务,文档是令人困惑的,实现是尴尬的。我相信旧版本的python对Popen有不同的默认值和不同的解释,所以造成了很多困惑。这似乎对Python 2.7.12和3.5.2都很有效。

关键是将bufsize=1设置为行缓冲,然后将universal_newlines=True设置为文本文件而不是二进制文件,这似乎成为设置bufsize=1时的默认值。

class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None           ## return code
self.error = None            ## flag indicates an error
self.errorstr = ""           ## info message about the error


def __del__(self):
self.wait()
DEBUG("Thread removed")


def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return

ERROR, DEBUG和VERBOSE仅仅是将输出打印到终端的宏。

这个解决方案在我看来是99.99%有效的,因为它仍然使用阻塞readline函数,所以我们假设子进程很好,输出完整的行。

我欢迎反馈,以改进解决方案,因为我仍然是Python新手。

这个解决方案使用select模块从IO流中“读取任何可用数据”。这个函数一开始会阻塞,直到数据可用,但随后只读取可用的数据,不再进一步阻塞。

鉴于它使用了select模块,这只适用于Unix。

该代码完全符合pep8。

import select




def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.


Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.


Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""


# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:


# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0


# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func


while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break


# Return read buffer
return read_buffer

我也遇到了杰西所描述的问题,并像布拉德利安迪和其他人一样使用“select”来解决它,但以阻塞模式来避免繁忙循环。它使用一个假Pipe作为假stdin。选择块并等待stdin或管道就绪。当按下一个键时,stdin解除阻塞,键值可以用read(1)检索。当不同的线程写入管道时,管道将解除阻塞,这可以被视为对stdin的需求已经结束。下面是一些参考代码:

import sys
import os
from select import select


# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")


# -------------------------------------------------------------------------
def getKey():


# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])


# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt


# Must finish
else:
return None


# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()


# -------------------------------------------------------------------------
# MAIN CODE


# Get key stroke
key = getKey()


# Keyboard input
if key:
# ... do your stuff with the key value


# Faked keystroke
else:
# ... use of stdin finished


# -------------------------------------------------------------------------
# OTHER THREAD CODE


breakStdinRead()

我有最初提问者的问题,但不希望调用线程。我将Jesse的解决方案与管道中的直接read()和我自己的行读取缓冲处理程序混合在一起(然而,我的子进程- ping -总是写完整的行<系统页面大小)。我通过读入一个gobject-registered io watch来避免忙碌的等待。这些天我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

观察者是

def watch(f, *other):
print 'reading',f.read()
return True

主程序建立一个ping,然后调用gobject邮件循环。

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

任何其他工作都附加到gobject中的回调。

在现代Python中,情况要好得多。

下面是一个简单的子程序"hello.py":

#!/usr/bin/env python3


while True:
i = input()
if i == "quit":
break
print(f"hello {i}")

以及一个与之交互的程序:

import asyncio




async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()




asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际的模式(几乎所有前面的回答,包括这里和相关的问题)是将子进程的stdout文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由asyncio提供的。

尝试wexpect,这是pexpect的windows替代选项。

import wexpect


p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

在类unix系统和Python 3.5+上,有os.set_blocking,它完全按照它说的做。

import os
import time
import subprocess


cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()

这个输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

os.set_blocking注释它是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

下面是一个基于线程的简单解决方案:

  • 适用于Linux和Windows(不依赖于select)。
  • 异步读取stdoutstderr
  • 不依赖于具有任意等待时间的活动轮询(CPU友好)。
  • 不使用asyncio(这可能与其他库冲突)。
  • 一直运行到子进程终止为止。

printer.py

import time
import sys


sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading




def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()




def enqueue_process(process, queue):
process.wait()
queue.put('x')




p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()


while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2':  # stderr
sys.stdout.write("\033[0;31m")  # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m")  # reset ANSI code
sys.stdout.flush()


tp.join()
to.join()
te.join()

不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。

两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9

由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。

该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。

您将在https://github.com/netinvent/command_runner找到包

如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):

pip install command_runner


from command_runner import command_runner


exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')
你可以在_poll_process()_monitor_process()中找到核心非阻塞读取代码,这取决于所采用的捕获方法。 从那里,你可以破解你想要的,或者简单地使用整个包来执行你的命令作为子进程的替换