函数调用超时

我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?

550169 次浏览

如果你在UNIX上运行,你可以使用信号包:

In [1]: import signal


# Register an handler for the timeout
In [2]: def handler(signum, frame):
...:     print("Forever is over!")
...:     raise Exception("end of time")
...:


# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...:     import time
...:     while 1:
...:         print("sec")
...:         time.sleep(1)
...:
...:


# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0


# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0


In [6]: try:
...:     loop_forever()
...: except Exception, exc:
...:     print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time


# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用signal.alarm(10)后10秒,调用处理程序。这会引发一个异常,您可以从常规Python代码中拦截该异常。

这个模块不能很好地使用线程(但是,谁能呢?)

请注意,因为我们在超时发生时引发异常,它最终可能会在函数内部被捕获并忽略,例如这样一个函数:

def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue

下面是对给定的基于线程的解决方案的轻微改进。

下面的代码支持异常:

def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]


return ["RESULT", result]




def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default


if it.result[0] == "exception":
raise it.result[1]


return it.result[1]

用5秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal


class TimeoutError(Exception):
pass


def handler(signum, frame):
raise TimeoutError()


# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)


return result

你可以使用multiprocessing.Process来做到这一点。

代码

import multiprocessing
import time


# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)


if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()


# Wait for 10 seconds or until process finishes
p.join(10)


# If thread is still active
if p.is_alive():
print "running... let's kill it..."


# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()


p.join()

我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。

import signal


def timeout(signum, frame):
raise myException


#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',


#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)


#change 5 to however many seconds you need
signal.alarm(5)


try:
main()
except myException:
print "whoops"

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit.threading_timeoutable装饰器,它将一个timeout参数添加到被装饰的函数中,它完成了你所期望的,它停止了函数。

在pypi: https://pypi.python.org/pypi/stopit上查看它

我怎么调用函数或者我怎么包装它,如果它超过5秒脚本取消它?

我发布了一个要点,它通过一个装饰器和一个threading.Timer来解决这个问题。下面是它的分类。

导入和设置兼容性

它是用Python 2和3测试的。它也应该在Unix/Linux和Windows下工作。

首先是进口。这些尝试保持代码的一致性,而不管Python版本:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread

使用版本独立代码:

try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass

现在我们已经从标准库导入了我们的功能。

exit_after装饰

接下来,我们需要一个函数来终止子线程中的main():

def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt

这是decorator本身:

def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer

使用

下面这个用法直接回答了你关于5秒后退出的问题!:

@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用将不会结束,相反,进程应该退出并返回一个跟踪!

KeyboardInterrupt并不总是停止一个睡眠线程

注意,在Windows上的Python 2中,睡眠并不总是被键盘中断中断,例如:

@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')


>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt

它也不可能中断扩展中运行的代码,除非它显式检查PyErr_CheckSignals(),参见忽略Cython, Python和KeyboardInterrupt

在任何情况下,我都会避免让线程休眠超过一秒钟——这在处理器时间上是一eon。

我如何调用这个函数,或者我用什么来包装它,以便如果它花费超过5秒,脚本将取消它还有别的吗?

要捕获它并做其他事情,你可以捕获KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

我需要可嵌套的定时中断(SIGALARM不能做),不会被时间阻塞。Sleep(基于线程的方法不能做到)。我最终复制并稍微修改了这里的代码:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python


# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/




"""alarm.py: Permits multiple SIGALRM events to be queued.


Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""


import heapq
import signal
from time import time


__version__ = '$Revision: 2539 $'.split()[1]


alarmlist = []


__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))




class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_




class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass




def __clear_alarm():
"""Clear an existing alarm.


If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))




def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.


Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()




def alarm(sec, func, *args, **keys):
"""Set an alarm.


When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()




def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.


It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()

还有一个用法示例:

import alarm
from time import sleep


try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'

我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:

import multiprocessing.pool
import functools


def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator

然后就像这样简单地超时测试或任何你喜欢的函数:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。

from concurrent.futures import ProcessPoolExecutor


# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)

超级简单的阅读和维护。

我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。

本机为python 3.2+,并反向移植到2.7 (pip install futures)。

线程和进程之间的切换就像用ThreadPoolExecutor替换ProcessPoolExecutor一样简单。

如果你想在超时时终止进程,我建议查看卵石

伟大的,易于使用和可靠的PyPi项目timeout-decorator (https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

使用:

import time
import timeout_decorator


@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i


if __name__ == '__main__':
mytest()

timeout-decorator不能在windows系统上工作,因为windows不支持signal

如果你在windows系统中使用超时装饰器,你会得到以下结果

AttributeError: module 'signal' has no attribute 'SIGALRM'

有些人建议使用use_signals=False,但不适合我。

作者@bitranox创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *


@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))


def main():
mytest('starting')




if __name__ == '__main__':
main()

给出以下例外:

TimeoutError: Function mytest timed out after 5 seconds

我是wrapt_timeout_decorator的作者

这里介绍的大多数解决方案乍一看在Linux下工作得很好——因为我们有fork()和signals()——但在windows上看起来有点不同。 当涉及到Linux上的子线程时,你不能再使用信号了

为了在Windows下生成一个进程,它需要是可pickle的——许多装饰函数或Class方法都不是。

所以你需要使用一个更好的pickler像dill和multiprocess(不是pickle和multiprocessing) -这就是为什么你不能使用ProcessPoolExecutor(或只有有限的功能)。

对于超时本身-您需要定义超时的含义-因为在Windows上它将花费相当多的(且无法确定的)时间来生成进程。这在短时间内会很棘手。让我们假设,刷出过程大约需要0.5秒(很容易!!)。如果你给一个0.2秒的超时会发生什么? 函数是否应该在0.5 + 0.2秒后超时(因此让方法运行0.2秒)? 或者被调用的进程应该在0.2秒后超时(在这种情况下,被修饰的函数将总是超时,因为在这段时间内它甚至没有被生成)

嵌套的装饰器也很讨厌,你不能在子线程中使用信号。如果你想要创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(并测试)。

其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用-不支持记录到另一个进程中的文件)

我试图涵盖所有的边缘情况,您可以查看包wrapt_timeout_decorator,或者至少测试您自己的解决方案,受到那里使用的单元测试的启发。

@Alexis Eggermont -不幸的是,我没有足够的分数来评论-也许其他人可以通知你-我认为我解决了你的进口问题。

下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:

  1. 子进程阻塞执行。
  2. timeout函数在类成员函数上的使用。
  3. 严格要求终止时间。

下面是代码和一些测试用例:

import threading
import signal
import os
import time


class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""




def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)


# Killing the main thread
os.kill(pid, signal.SIGTERM)




def handle_term(signum, frame):
raise TerminateExecution()




def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False


# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()


if terminate:
raise BaseException("xxx")


return result


### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337




def really_long_function():
time.sleep(10)




def really_long_function2():
os.system("sleep 787")




# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337


# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)


t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)




t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)


# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)




class X:
def __init__(self):
self.value = 0


def set(self, v):
self.value = v




x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

asyncio的另一个解决方案:

如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor




class SingletonTimeOut:
pool = None


@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e


@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool


@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()


# ---------------


TIME_OUT = float('0.2')  # seconds


def toto(input_items,nb_predictions):
return 1


to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")


results = SingletonTimeOut.run(to_run, TIME_OUT)


突出了

  • 提出了TimeoutError使用异常来警告超时-可以很容易地修改
  • 跨平台的: Windows &Mac OS X
  • 兼容性: Python 3.6+(我也在Python 2.7上进行了测试,它可以进行小的语法调整)

有关并行映射的完整解释和扩展,请参见这里https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小的例子

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
...
__main__.TimeoutError: function 'bar' timed out after 4s

正如预期的那样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill


from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any


class TimeoutError(Exception):


def __init__(self, func: Callable, timeout: int):
self.t = timeout
self.fname = func.__name__


def __str__(self):
return f"function '{self.fname}' timed out after {self.t}s"




def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
"""lemmiwinks crawls into the unknown"""
q.put(dill.loads(func)(*args, **kwargs))




def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
"""
Single function call with a timeout


Args:
func: the function
timeout: The timeout in seconds
"""


if not isinstance(timeout, int):
raise ValueError(f'timeout needs to be an int. Got: {timeout}')


if func is None:
return functools.partial(killer_call, timeout=timeout)


@functools.wraps(killer_call)
def _inners(*args, **kwargs) -> Any:
q_worker = mp.Queue()
proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
proc.start()
try:
return q_worker.get(timeout=timeout)
except mpq.Empty:
raise TimeoutError(func, timeout)
finally:
try:
proc.terminate()
except:
pass
return _inners


if __name__ == '__main__':
@killer_call(timeout=4)
def bar(x):
import time
time.sleep(x)
return x


print(bar(2))
bar(10)

笔记

由于dill的工作方式,您将需要在函数内部导入。

这也意味着如果目标函数中有导入,这些函数可能不会与doctest不兼容。你会得到一个问题,__import__没有找到。

在@piro答案的基础上,您可以构建一个contextmanager。这允许非常易读的代码,将在成功运行后禁用警报信号(sets signal.alarm(0))

from contextlib import contextmanager
import signal
import time


@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise TimeoutError(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)


def sleeper(duration):
time.sleep(duration)
print('finished')

使用示例:

In [19]: with timeout(2):
...:     sleeper(1)
...:
finished


In [20]: with timeout(2):
...:     sleeper(3)
...:
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2     sleeper(3)
3


<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2     time.sleep(t)
3     print('finished')
4


<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
5     signal.signal(signal.SIGALRM, timeout_handler)
6     signal.alarm(duration)


Exception: block timedout after 2 seconds

以防对任何人都有帮助,在@piro的回答的基础上,我做了一个函数装饰器:

import time
import signal
from functools import wraps




def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")


# Register the signal function handler
signal.signal(signal.SIGALRM, handler)


# Define a timeout for your function
signal.alarm(timeout_secs)


result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)


return result


return time_limited


return wrapper

在具有20 seconds超时的函数上使用包装器看起来像这样:

    @timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")


try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")

我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:

from timeout_timer import timeout, TimeoutInterrupt


class TimeoutInterruptNested(TimeoutInterrupt):
pass


def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2

参见https://github.com/dozysun/timeout-timer

下面是一个简单的例子,运行一个带有timeout的方法,并在成功时检索它的值。

import multiprocessing
import time


ret = {"foo": False}




def worker(queue):
"""worker function"""


ret = queue.get()


time.sleep(1)


ret["foo"] = True
queue.put(ret)




if __name__ == "__main__":
queue = multiprocessing.Queue()
queue.put(ret)


p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join(timeout=10)


if p.exitcode is None:
print("The worker timed out.")
else:
print(f"The worker completed and returned: {queue.get()}")

蒂姆·萨凡纳的func_timeout包对我来说很管用。

安装:

pip install func_timeout

用法:

import time
from func_timeout import func_timeout, FunctionTimedOut


def my_func(n):
time.sleep(n)


time_to_sleep = 10


# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})


# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))

如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。

from concurrent.futures import ThreadPoolExecutor


from time import sleep
import multiprocessing




# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)


return a+b+c


# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)


return 'hello '+in_name

作为上下文管理器的实际类

class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)


def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())


def  __enter__(self):
return self


def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res




def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()

如何使用

print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)


print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)