如何从 Python 异步运行外部命令?

我需要从 Python 脚本异步运行 shell 命令。我的意思是,我希望 Python 脚本在外部命令停止运行并执行任何需要执行的操作时继续运行。

我读了这篇文章:

在 Python 中调用外部命令

然后我离开并做了一些测试,看起来如果我在命令的末尾使用 &,那么 os.system()将完成这项工作,这样我就不必等待它返回。我想知道的是,这是否是实现这一目标的正确方式?我试过 commands.call(),但是它对我不起作用,因为它阻塞了外部命令。

请让我知道如果使用 os.system()为这是明智的,或者如果我应该尝试其他路线。

199070 次浏览

subprocess.Popen 正是你想要的。

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(根据评论编辑以完成答案)

Popen 实例可以执行各种其他操作,比如可以 poll()它来查看它是否仍在运行,可以 communicate()用它在 stdin 上发送数据,并等待它终止。

如果希望并行运行多个进程,并在它们生成结果时处理它们,可以使用轮询,如下所示:

from subprocess import Popen, PIPE
import time


running_procs = [
Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]


while running_procs:
for proc in running_procs:
retcode = proc.poll()
if retcode is not None: # Process finished.
running_procs.remove(proc)
break
else: # No process is done, wait a bit and check again.
time.sleep(.1)
continue


# Here, `proc` has finished with return code `retcode`
if retcode != 0:
"""Error handling."""
handle_results(proc.stdout)

这里的控制流有点复杂,因为我试图让它变小——您可以根据自己的喜好进行重构。:-)

这样做的好处是首先满足提前完成的请求。如果您在第一个正在运行的进程上调用 communicate,结果发现运行时间最长,那么其他正在运行的进程将一直处于空闲状态,而您本可以处理它们的结果。

我想知道的是,这个[ os.system ()]是否是实现这一目标的正确方法?

不。 os.system()不是正确的方法。这就是为什么大家都说要使用 subprocess

有关更多信息,请阅读 http://docs.python.org/library/os.html#os.system

子流程模块提供了更多信息 产卵的强大设施 处理和检索它们的 结果; 使用该模块是 优于使用此函数。请使用 子进程模块。检查 特别是替换老人 具有子流程模块的函数 部分。

我在 异步模块上取得了很好的成功,它很好地处理了进程的输出。例如:

import os
from asynproc import Process
myProc = Process("myprogram.app")


while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll is not None:
break
# print any new output
out = myProc.read()
if out != "":
print out

我在尝试使用 Python 中的 s3270脚本软件连接到3270终端时遇到了同样的问题。现在我用我在这里找到的 Process 的一个子类来解决这个问题:

Http://code.activestate.com/recipes/440554/

下面是从文件中提取的样本:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
if tr < 1:
tr = 1
x = time.time()+t
y = []
r = ''
pr = p.recv
if stderr:
pr = p.recv_err
while time.time() < x or r:
r = pr()
if r is None:
if e:
raise Exception(message)
else:
break
elif r:
y.append(r)
else:
time.sleep(max((x-time.time())/tr, 0))
return ''.join(y)


def send_all(p, data):
while len(data):
sent = p.send(data)
if sent is None:
raise Exception(message)
data = buffer(data, sent)


if __name__ == '__main__':
if sys.platform == 'win32':
shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
else:
shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')


a = Popen(shell, stdin=PIPE, stdout=PIPE)
print recv_some(a),
for cmd in commands:
send_all(a, cmd + tail)
print recv_some(a),
send_all(a, 'exit' + tail)
print recv_some(a, e=0)
a.wait()

使用带有非阻塞读取行的 预计是另一种方法。PExpt 解决了死锁问题,允许您在后台轻松地运行进程,并且当您的进程产生预定义的字符串时,提供了简单的回调方法,并且通常使得与进程的交互更加容易。

考虑到“我不必等着它回来”,最简单的解决方案之一将是:

subprocess.Popen( \
[path_to_executable, arg1, arg2, ... argN],
creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

但是... 从我读到的这是不是“正确的方式来完成这样的事情”,因为安全风险由 subprocess.CREATE_NEW_CONSOLE标志。

这里发生的关键事情是使用 subprocess.CREATE_NEW_CONSOLE创建新的控制台和 .pid(返回进程 ID,以便您可以稍后检查程序,如果您想) ,以便不等待程序完成其工作。

公认的答案是 非常旧。

我在这里找到了一个更现代的答案:

Https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

并做了一些改变:

  1. 让它在窗户上工作
  2. 使它可以使用多个命令
import sys
import asyncio


if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())




async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break




async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
try:
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)


await asyncio.wait(
[
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb),
]
)
rc = await process.wait()
return process.pid, rc
except OSError as e:
# the program will hang if we let any exception propagate
return e




def execute(*aws):
""" run the given coroutines in an asyncio loop
returns a list containing the values returned from each coroutine.
"""
loop = asyncio.get_event_loop()
rc = loop.run_until_complete(asyncio.gather(*aws))
loop.close()
return rc




def printer(label):
def pr(*args, **kw):
print(label, *args, **kw)


return pr




def name_it(start=0, template="s{}"):
"""a simple generator for task names
"""
while True:
yield template.format(start)
start += 1




def runners(cmds):
"""
cmds is a list of commands to excecute as subprocesses
each item is a list appropriate for use by subprocess.call
"""
next_name = name_it().__next__
for cmd in cmds:
name = next_name()
out = printer(f"{name}.stdout")
err = printer(f"{name}.stderr")
yield _stream_subprocess(cmd, out, err)




if __name__ == "__main__":
cmds = (
[
"sh",
"-c",
"""echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
],
[
"bash",
"-c",
"echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
],
[sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
)


print(execute(*runners(cmds)))

示例命令不太可能在您的系统上完美地工作,而且它不能处理奇怪的错误,但是这段代码确实演示了一种使用异步运行多个子进程并对输出进行流处理的方法。

这里有几个答案,但没有一个能满足我以下的要求:

  1. 我不想等待命令完成或者用子进程输出污染我的终端。

  2. 我想用重定向运行 bash 脚本。

  3. 我希望在 bash 脚本中支持管道(例如 find ... | tar ...)。

满足上述要求的唯一组合是:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)

Python 3子流程示例在“等待命令异步终止”下涵盖了这一点。使用 IPythonpython -m asyncio运行以下代码:

import asyncio


proc = await asyncio.create_subprocess_exec(
'ls','-lha',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)


# do something else while ls is working


# if proc takes very long to complete, the CPUs are free to use cycles for
# other processes
stdout, stderr = await proc.communicate()

一旦 await asyncio.create_subprocess_exec(...)完成,该进程将立即开始运行。如果在您调用 await proc.communicate()时它还没有完成,它将在那里等待,以便给出您的输出状态。如果它已经完成,proc.communicate()将立即返回。

这里的要点类似于 回答是雄鹰,但我认为 Terrel 的答案似乎过于复杂。

有关更多信息,请参见 asyncio.create_subprocess_exec