使用 python 的多处理池中断键盘

如何使用 python 的多处理池来处理 KeyboardInterrupt 事件:

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
sleep(1)
return i*i


def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results


if __name__ == "__main__":
go()

当运行上面的代码时,当我按下 ^C时,KeyboardInterrupt会升高,但是进程只是挂起在那个点上,我必须在外部终止它。

我希望能够按 ^C在任何时候,并导致所有的进程退出优雅。

91498 次浏览

奇怪的是,看起来你不得不在孩子们身上也处理 KeyboardInterrupt。我希望这个工作,作为书面... 尝试改变 slowly_square:

def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0

应该和你想的一样。

这是一个 Python 错误。当在线程中等待条件时。条件等待() ,键盘中断永远不会被发送:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

KeyboardInterrupt 异常在 wait ()返回之前不会被传递,而且它永远不会返回,所以中断永远不会发生。键盘中断几乎肯定会中断条件等待。

注意,如果指定了超时,则不会发生这种情况; cond.wait (1)将立即接收中断。因此,解决方案是指定超时。要做到这一点,更换

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似的。

由于某些原因,只有从基 Exception类继承的异常才能正常处理。作为一个变通方法,你可以将你的 KeyboardInterrupt重新提升为一个 Exception实例:

from multiprocessing import Pool
import time


class KeyboardInterruptError(Exception): pass


def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()


def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'


if __name__ == '__main__':
main()

通常您会得到以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此,如果你按 ^C,你会得到:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

我发现,目前最好的解决方案是不使用 multiprocessing.pool 特性,而是使用您自己的池功能。我提供了一个示例来演示使用 application _ sync 出现的错误,同时还提供了一个示例来说明如何避免完全使用池功能。

Http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

根据我最近的发现,最好的解决方案是设置辅助进程来完全忽略 SIGINT,并将所有清理代码限制在父进程中。这修复了空闲和繁忙工作进程的问题,并且不需要子进程中的错误处理代码。

import signal


...


def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)


...


def main()
pool = multiprocessing.Pool(size, init_worker)


...


except KeyboardInterrupt:
pool.terminate()
pool.join()

解释和完整的示例代码可以在 http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt分别找到。

通常,这种简单的结构适用于 Pool 上的 Ctrl-C:

def signal_handle(_signal, frame):
print "Stopping the Jobs."


signal.signal(signal.SIGINT, signal_handle)

正如少数几个类似职位所述:

在 Python 中捕获键盘中断,不需要 try-etc

似乎有两个问题,使异常而多处理恼人。第一个(Glenn 提到的)是,您需要使用带有超时的 map_async而不是 map来获得即时响应(即,不要完成整个列表的处理)。第二点(Andrey 指出)是多处理不能捕获不继承自 Exception的异常(例如,SystemExit)。所以我的解决办法就是解决这两个问题:

import sys
import functools
import traceback
import multiprocessing


def _poolFunctionWrapper(function, arg):
"""Run function under the pool


Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))


def _runPool(pool, timeout, function, iterable):
"""Run the pool


Wrapper around pool.map_async, to handle timeout.  This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool


Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)


def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results

我是巨蟒的新手。我到处寻找答案,偶然发现了这个,还有一些其他的博客和 youtube 视频。我尝试复制粘贴作者上面的代码,并在我的 python2.7.13中使用 Windows764位复制它。这和我想要达到的目标很接近。

我让子进程忽略 ControlC 并终止父进程。看来绕过子进程确实为我避免了这个问题。

#!/usr/bin/python


from multiprocessing import Pool
from time import sleep
from sys import exit




def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass




def go():
pool = Pool(8)


try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results




if __name__ == '__main__':
go()

pool.terminate()开始的部分似乎从未执行过。

经过表决的答案并没有解决核心问题,而是产生了类似的副作用。

Jesse Noller 是多处理库的作者,他解释了在旧的 博客文章中使用 multiprocessing.Pool时如何正确处理 CTRL + C。

import signal
from multiprocessing import Pool




def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)




pool = Pool(initializer=initializer)


try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()

您可以尝试使用 Pool 对象的 application _ sync 方法,如下所示:

import multiprocessing
import time
from datetime import datetime




def test_func(x):
time.sleep(2)
return x**2




def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)


try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])


results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()




if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)


t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)

产出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

这种方法的一个优点是,在中断之前处理的结果将在结果字典中返回:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

许多这些答案是旧的和/或他们似乎不与 如果您正在执行类似于 Pool.map的方法,那么 Python 的后续版本(我正在运行3.8.5)会在 Windows 上阻塞,直到所有提交的任务都完成为止工作。以下是我的解决方案。

  1. 在主进程中发出对 signal.signal(signal.SIGINT, signal.SIG_IGN)的调用以完全忽略 Ctrl-C。
  2. 处理池将用池初始化器进行初始化,该初始化器将这样初始化每个处理器: 全局变量 ctrl_c_entered将设置为 False,对 signal.signal(signal.SIGINT, signal.SIG_IGN)的调用将发出给 一开始忽略 Ctrl-C。这个调用的返回值将被保存; 这是原始的默认处理程序,当重新建立时允许处理 KyboardInterrupt异常。
  3. 修饰符 handle_ctrl_c可用于修饰多处理函数和方法,这些函数和方法在输入 Ctrl-C 时应立即退出。这个装饰器将测试是否设置了全局 ctrl_c_entered标志,如果设置了,甚至不需要运行函数/方法,而是返回一个 KeyboardInterrupt异常实例。否则,将建立 KeyboardInterrupt的 try/catch 处理程序,并调用修饰的函数/方法。如果输入 Ctrl-C,则全局 ctrl_c_entered将设置为 True,并返回一个 KeyboardInterrupt异常实例。在任何情况下,在返回之前,装饰器将重新建立 SIG _ IGN 处理程序。

实际上,所有提交的任务都允许启动,但是一旦输入了 Ctrl-C,返回值为 KeyBoardInterrupt异常的任务将立即终止。主进程可以测试返回值是否存在这样的返回值,以检测是否输入了 Ctrl-C。

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps


def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper


@handle_ctrl_c
def slowly_square(i):
sleep(1)
return i*i


def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True


def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)


def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(slowly_square, range(10))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)
pool.close()
pool.join()


if __name__ == '__main__':
main()

印刷品:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]