使用模块'子进程'与超时

下面是运行任意命令返回stdout数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT,  # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)

communicate用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

subprocess模块不支持超时——杀死运行超过X秒的进程的能力——因此,communicate可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的简单的方法是什么?

375488 次浏览

如果你用的是Unix,

import signal
...
class Alarm(Exception):
pass


def alarm_handler(signum, frame):
raise Alarm


signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
stdoutdata, stderrdata = proc.communicate()
signal.alarm(0)  # reset the alarm
except Alarm:
print "Oops, taking too long!"
# whatever else

我已经在Windows、Linux和Mac上成功地使用了killableprocess。如果你使用Cygwin Python,你将需要OSAF版本的killableprocess,因为否则本机Windows进程不会被杀死。

下面是Alex Martelli的解决方案,它是一个具有适当进程终止的模块。其他方法不起作用,因为它们没有使用proc. communication()。所以如果你有一个产生大量输出的进程,它会填满它的输出缓冲区,然后阻塞,直到你从中读取一些东西。

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen


def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
'''
Run a command with a timeout after which it will be forcibly
killed.
'''
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
if timeout != -1:
signal(SIGALRM, alarm_handler)
alarm(timeout)
try:
stdout, stderr = p.communicate()
if timeout != -1:
alarm(0)
except Alarm:
pids = [p.pid]
if kill_tree:
pids.extend(get_process_children(p.pid))
for pid in pids:
# process might have died before getting to this line
# so wrap to avoid OSError: no such process
try:
kill(pid, SIGKILL)
except OSError:
pass
return -9, '', ''
return p.returncode, stdout, stderr


def get_process_children(pid):
p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
stdout = PIPE, stderr = PIPE)
stdout, stderr = p.communicate()
return [int(p) for p in stdout.split()]


if __name__ == '__main__':
print run('find /', shell = True, timeout = 3)
print run('find', shell = True)

另一种选择是写入临时文件以防止标准输出阻塞,而不需要使用communication()轮询。在其他答案没有的地方,这个方法对我有用;比如在窗户上。

    outFile =  tempfile.SpooledTemporaryFile()
errFile =   tempfile.SpooledTemporaryFile()
proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
wait_remaining_sec = timeout


while proc.poll() is None and wait_remaining_sec > 0:
time.sleep(1)
wait_remaining_sec -= 1


if wait_remaining_sec <= 0:
killProc(proc.pid)
raise ProcessIncompleteError(proc, timeout)


# read temp streams from start
outFile.seek(0);
errFile.seek(0);
out = outFile.read()
err = errFile.read()
outFile.close()
errFile.close()
我对底层的细节了解不多;但是,考虑到这一点 python 2.6的API提供了等待线程和的能力 终止进程,那么在一个单独的进程中运行该进程呢 的线程吗?< / p >
import subprocess, threading


class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None


def run(self, timeout):
def target():
print 'Thread started'
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
print 'Thread finished'


thread = threading.Thread(target=target)
thread.start()


thread.join(timeout)
if thread.is_alive():
print 'Terminating process'
self.process.terminate()
thread.join()
print self.process.returncode


command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

这段代码在我的机器中的输出是:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

,其中可以看到,在第一次执行时,进程 正确完成(返回代码0),而在第二个

.进程被终止(返回代码-15)

我没有在windows中测试;但是,除了更新示例之外 命令,我想它应该工作,因为我还没有找到 记录任何说明该线程的内容。Join或process.terminate

我已经实现了我能从其中一些收集到的东西。这适用于Windows,因为这是一个社区维基,我想我也会分享我的代码:

class Command(threading.Thread):
def __init__(self, cmd, outFile, errFile, timeout):
threading.Thread.__init__(self)
self.cmd = cmd
self.process = None
self.outFile = outFile
self.errFile = errFile
self.timed_out = False
self.timeout = timeout


def run(self):
self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
stderr = self.errFile)


while (self.process.poll() is None and self.timeout > 0):
time.sleep(1)
self.timeout -= 1


if not self.timeout > 0:
self.process.terminate()
self.timed_out = True
else:
self.timed_out = False

然后从另一个类或文件:

        outFile =  tempfile.SpooledTemporaryFile()
errFile =   tempfile.SpooledTemporaryFile()


executor = command.Command(c, outFile, errFile, timeout)
executor.daemon = True
executor.start()


executor.join()
if executor.timed_out:
out = 'timed out'
else:
outFile.seek(0)
errFile.seek(0)
out = outFile.read()
err = errFile.read()


outFile.close()
errFile.close()

我添加了从jcollado线程到我的Python模块easyprocess的解决方案。

安装:

pip install easyprocess

例子:

from easyprocess import Proc


# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout

虽然我还没有广泛地看它,这个装饰我发现在ActiveState似乎是相当有用的这类事情。加上subprocess.Popen(..., close_fds=True),至少我已经准备好用Python编写shell脚本了。

我使用的解决方案是用期限作为shell命令的前缀。如果命令花费的时间太长,timelimit将停止它,Popen将有一个由timelimit设置的返回码。如果它是> 128,它意味着时间限制杀死进程。

另见python子进程,超时和大输出(>64K)

jcollado的答案可以使用线程。计时器类来简化:

import shlex
from subprocess import Popen, PIPE
from threading import Timer


def run(cmd, timeout_sec):
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
timer = Timer(timeout_sec, proc.kill)
try:
timer.start()
stdout, stderr = proc.communicate()
finally:
timer.cancel()


# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second

一旦你理解了在*unix中运行机器的整个过程,你将很容易找到更简单的解决方案:

考虑这个简单的例子,如何使用select.select()(现在几乎在*nix上随处可见)创建可超时的communication()冰毒。这也可以用epoll/poll/kqueue来编写,但select.select()变体可能是一个很好的例子。select.select()的主要限制(速度和1024 max fds)不适用于您的任务。

它在*nix下工作,不创建线程,不使用信号,可以从任何线程启动(不仅仅是主线程),并且足够快,可以从我机器上的stdout读取250mb/s的数据(i5 2.3ghz)。

在通信结束时连接stdout/stderr时出现问题。如果你有大量的程序输出,这可能会导致大量的内存使用。但是您可以多次调用communication(),超时时间较小。

class Popen(subprocess.Popen):
def communicate(self, input=None, timeout=None):
if timeout is None:
return subprocess.Popen.communicate(self, input)


if self.stdin:
# Flush stdio buffer, this might block if user
# has been writing to .stdin in an uncontrolled
# fashion.
self.stdin.flush()
if not input:
self.stdin.close()


read_set, write_set = [], []
stdout = stderr = None


if self.stdin and input:
write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []


input_offset = 0
deadline = time.time() + timeout


while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
except select.error as ex:
if ex.args[0] == errno.EINTR:
continue
raise


if not (rlist or wlist):
# Just break if timeout
# Since we do not close stdout/stderr/stdin, we can call
# communicate() several times reading data by smaller pieces.
break


if self.stdin in wlist:
chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
try:
bytes_written = os.write(self.stdin.fileno(), chunk)
except OSError as ex:
if ex.errno == errno.EPIPE:
self.stdin.close()
write_set.remove(self.stdin)
else:
raise
else:
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()
write_set.remove(self.stdin)


# Read stdout / stderr by 1024 bytes
for fn, tgt in (
(self.stdout, stdout),
(self.stderr, stderr),
):
if fn in rlist:
data = os.read(fn.fileno(), 1024)
if data == '':
fn.close()
read_set.remove(fn)
tgt.append(data)


if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)


return (stdout, stderr)

我修改了sussudio答案。现在函数返回:(returncodestdoutstderrtimeout) - stdoutstderr被解码为utf-8字符串

def kill_proc(proc, timeout):
timeout["value"] = True
proc.kill()


def run(cmd, timeout_sec):
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timeout = {"value": False}
timer = Timer(timeout_sec, kill_proc, [proc, timeout])
timer.start()
stdout, stderr = proc.communicate()
timer.cancel()
return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

不幸的是,我的雇主在公开源代码方面有非常严格的规定,所以我不能提供实际的代码。但对我来说,最好的解决方案是创建一个子类重写Popen.wait()来轮询而不是无限期地等待,并Popen.__init__接受超时参数。一旦你这样做了,所有其他Popen方法(调用wait)将按预期工作,包括communicate

在Python 3.3+中:

from subprocess import STDOUT, check_output


output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output是一个字节字符串,包含命令的合并标准输出,标准错误数据。

proc.communicate()方法不同,check_output在问题文本中指定的非零退出状态上引发CalledProcessError

我删除了shell=True,因为它经常被不必要地使用。如果cmd确实需要它,你总是可以把它加回来。如果你添加shell=True,也就是说,如果子进程产生了它自己的后代;check_output()可以比timeout指示的时间晚很多返回,参见子进程超时失败

超时特性在Python 2中可用。x通过3.2+子进程模块的subprocess32后端端口。

只是想写些简单点的东西。

#!/usr/bin/python


from subprocess import Popen, PIPE
import datetime
import time


popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime =  3


print "Start time %s"%(sttime)


while True:
popen.poll();
time.sleep(1)
rcode = popen.returncode
now = time.time();
if [ rcode is None ]  and  [ now > (sttime + waittime) ] :
print "Killing it now"
popen.kill()

这是我的解决方案,我使用线程和事件:

import subprocess
from threading import Thread, Event


def kill_on_timeout(done, timeout, proc):
if not done.wait(timeout):
proc.kill()


def exec_command(command, timeout):


done = Event()
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)


watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
watcher.daemon = True
watcher.start()


data, stderr = proc.communicate()
done.set()


return data, stderr, proc.returncode

在行动:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)


In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

在子进程模块中通过call()communicate() 现在支持timeout(从Python3.3开始):

import subprocess


subprocess.call("command", timeout=20, shell=True)

这将调用该命令并引发异常

subprocess.TimeoutExpired

如果命令在20秒后还没有完成。

然后你可以处理异常来继续你的代码,就像这样:

try:
subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
# insert code here

希望这能有所帮助。

令人惊讶的是没有人提到使用timeout

timeout 5 ping -c 3 somehost

显然,这并不适用于每个用例,但如果您处理的是一个简单的脚本,那么这是很难克服的。

也可以在coreutils中通过homebrew为mac用户提供gtimeout。

https://pypi.python.org/pypi/python-subprocess2提供了子进程模块的扩展,允许你等待一段时间,否则终止。

因此,等待最多10秒的进程终止,否则kill:

pipe  = subprocess.Popen('...')


timeout =  10


results = pipe.waitOrTerminate(timeout)

这与windows和unix兼容。"results"是一个字典,它包含"returnCode",这是应用程序的返回(或None,如果它必须被杀死),以及"actionTaken"。如果流程正常完成,则为“SUBPROCESS2_PROCESS_COMPLETED”,或者根据所采取的操作,为“SUBPROCESS2_PROCESS_TERMINATED”和SUBPROCESS2_PROCESS_KILLED(详细信息请参阅文档)

如果你正在使用python2,请尝试一下

import subprocess32


try:
output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
print e

这个解决方案在shell=True的情况下杀死进程树,将参数传递给进程(或不传递),有一个超时,并获得回调的stdout, stderr和进程输出(它使用psutil作为kill_proc_tree)。这是基于在SO中发布的几个解决方案,包括jcollado的。回复Anson和jradice在jcollado的回复。在Windows Srvr 2012和Ubuntu 14.04中测试。请注意,对于Ubuntu,你需要将parent.children(…)调用改为parent.get_children(…)。

def kill_proc_tree(pid, including_parent=True):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)


def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
def target():
process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)


# wait for the process to terminate
if (cmd_parms == ""):
out, err = process.communicate()
else:
out, err = process.communicate(cmd_parms)
errcode = process.returncode


thread = Thread(target=target)
thread.start()


thread.join(timeout)
if thread.is_alive():
me = os.getpid()
kill_proc_tree(me, including_parent=False)
thread.join()

有一个想法是子类化Popen类并用一些简单的方法装饰器来扩展它。我们叫它ExpirablePopen吧。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread




class ExpirablePopen(Popen):


def __init__(self, *args, **kwargs):
self.timeout = kwargs.pop('timeout', 0)
self.timer = None
self.done = Event()


Popen.__init__(self, *args, **kwargs)


def __tkill(self):
timeout = self.timeout
if not self.done.wait(timeout):
error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
self.kill()


def expirable(func):
def wrapper(self, *args, **kwargs):
# zero timeout means call of parent method
if self.timeout == 0:
return func(self, *args, **kwargs)


# if timer is None, need to start it
if self.timer is None:
self.timer = thr = Thread(target=self.__tkill)
thr.daemon = True
thr.start()


result = func(self, *args, **kwargs)
self.done.set()


return result
return wrapper


wait = expirable(Popen.wait)
communicate = expirable(Popen.communicate)




if __name__ == '__main__':
from subprocess import PIPE


print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

你可以使用select来做到这一点

import subprocess
from datetime import datetime
from select import select


def call_with_timeout(cmd, timeout):
started = datetime.now()
sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
while True:
p = select([sp.stdout], [], [], timeout)
if p[0]:
p[0][0].read()
ret = sp.poll()
if ret is not None:
return ret
if (datetime.now()-started).total_seconds() > timeout:
sp.kill()
return None

我有一个问题,我想终止一个多线程子进程,如果它花费的时间超过给定的超时长度。我想在Popen()中设置一个超时,但它不起作用。然后,我意识到Popen().wait()等于call(),所以我有了在.wait(timeout=xxx)方法中设置超时的想法,最终成功了。因此,我是这样解决的:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool


cores_for_parallelization = 4
timeout_time = 15  # seconds


def main():
jobs = [...YOUR_JOB_LIST...]
with Pool(cores_for_parallelization) as p:
p.map(run_parallel_jobs, jobs)


def run_parallel_jobs(args):
# Define the arguments including the paths
initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
final_list = [initial_terminal_command, function_to_start]
final_list.extend(args)


# Start the subprocess and determine the process PID
subp = subprocess.Popen(final_list)  # starts the process
pid = subp.pid


# Wait until the return code returns from the function by considering the timeout.
# If not, terminate the process.
try:
returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
except subprocess.TimeoutExpired:
# Distinguish between Linux and Windows and terminate the process if
# the timeout has been expired
if sys.platform == 'linux2':
os.kill(pid, signal.SIGTERM)
elif sys.platform == 'win32':
subp.terminate()


if __name__ == '__main__':
main()

预先使用Linux命令timeout并不是一个糟糕的解决方法,它对我来说是有效的。

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

Python 3.5开始,有了一个新的subprocess.run通用命令(它意味着取代check_callcheck_output…),并且它也有timeout=参数。

子流程。运行(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout =没有, check=False, encoding=None, errors=None)

运行arg游戏所描述的命令。等待命令完成,然后返回一个CompletedProcess实例。

当超时过期时,它会引发subprocess.TimeoutExpired异常。

对于python 2.6+,使用gevent

 from gevent.subprocess import Popen, PIPE, STDOUT


def call_sys(cmd, timeout):
p= Popen(cmd, shell=True, stdout=PIPE)
output, _ = p.communicate(timeout=timeout)
assert p.returncode == 0, p. returncode
return output


call_sys('./t.sh', 2)


# t.sh example
sleep 5
echo done
exit 1

python 2.7

import time
import subprocess


def run_command(cmd, timeout=0):
start_time = time.time()
df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while timeout and df.poll() == None:
if time.time()-start_time >= timeout:
df.kill()
return -1, ""
output = '\n'.join(df.communicate()).strip()
return df.returncode, output

在Python 3.7.8中超时测试后捕获的输出示例:

try:
return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
print(e.output.decode(encoding="utf-8", errors="ignore"))
assert False;

异常子进程。TimeoutExpired有输出和其他成员:

cmd -用于生成子进程的命令。

timeout -超时时间,单位为秒。

output -子进程的输出,如果它被run()或捕获 check_output()。否则,没有。< / p >

stdout -输出的别名,用于与stderr对称。

stderr -子进程的stderr输出,如果它被捕获 run()。否则,没有。< / p >

更多信息:https://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

仅对Linux进行延迟回答,但如果有人想使用subprocess.getstatusoutput(),其中timeout参数不可用,则可以在命令的开头使用内置Linux超时,即:

import subprocess


timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)


if (exit_c == 0):
print("success")
else:
print("Error: ", out)

timeout参数:

  • __abc0: __abc1
  • __abc0: __abc1
  • 25:超时值,以秒为单位

有时你需要在不使用communicate()的情况下处理(ffmpeg),在这种情况下你需要异步超时,这是使用ttldict的一种实用方法

PIP安装ttldict

from ttldict import  TTLOrderedDict
sp_timeout = TTLOrderedDict(default_ttl=10)


def kill_on_timeout(done, proc):
while True:
now = time.time()
if sp_timeout.get('exp_time') == None:
proc.kill()
break
    

process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, stderr=subprocess.STDOUT)
            

sp_timeout['exp_time'] = time.time()
            

done = Event()
watcher = Thread(target=kill_on_timeout, args=(done, process))
watcher.daemon = True
watcher.start()
done.set()


for line in process.stdout:
.......