Python 进程池非守护进程? ?

是否有可能创建一个非守护进程的 Python Pool?我希望一个池能够调用一个函数,其中包含另一个池。

我希望这样做是因为守护进程不能创建进程。具体来说,它将导致错误:

AssertionError: daemonic processes are not allowed to have children

例如,考虑这样一个场景: function_a有一个运行 function_b的池,而 function_b有一个运行 function_c的池。这个函数链将失败,因为 function_b正在一个守护进程中运行,而守护进程不能创建进程。

71492 次浏览

multiprocessing.pool.Pool类在它的 __init__方法中创建辅助进程,使它们成为守护进程并启动它们,在它们启动之前不可能将它们的 daemon属性重新设置为 False(之后就不再允许了)。但是你可以创建你自己的子类 multiprocesing.pool.Pool(multiprocessing.Pool只是一个包装函式)并替换你自己的子类 multiprocessing.Process,它总是非守护进程,用于辅助进程。

这里有一个完整的例子来说明如何做到这一点。重要的部分是顶部的两个类 NoDaemonProcessMyPool,并在最后调用 MyPool实例上的 pool.close()pool.join()

#!/usr/bin/env python
# -*- coding: UTF-8 -*-


import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time


from random import randint




class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)


# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess


def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t


def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)


result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])


# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result


def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)


result = pool.map(work, [randint(1, 5) for x in range(5)])


pool.close()
pool.join()
print(result)


if __name__ == '__main__':
test()

多重处理模块有一个很好的接口,可以使用具有进程 或者线程的池。根据您当前的用例,您可以考虑对外部 Pool 使用 multiprocessing.pool.ThreadPool,这将导致线程 (允许从内部产生进程),而不是进程。

它可能受到 GIL 的限制,但在我的特殊情况下,创建 给你时从外部 Pool启动进程的时间远远超过使用 ThreadPool的解决方案。


Processes换成 Threads真的很容易。阅读更多关于如何使用 ThreadPool解决方案 给你给你的内容。

我遇到的问题是试图在模块之间导入全局变量,导致 ProcessPool ()行被多次计算。

Globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool


class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)


def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None


def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance


def __deepcopy__(self, item):
return item.__class__.instance


class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     

The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager      = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool  = ThreadPool()
self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码的其他地方安全地导入

from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock

我在这里围绕 pathos.multiprocessing编写了一个更加扩展的包装类:

顺便说一句,如果您的用例仅需要异步多进程映射作为性能优化,那么 joblib 将在后台管理所有的进程池,并允许这种非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

我有必要在 Python 3.7中使用一个非守护进程池,并最终修改了公认答案中发布的代码。下面是创建非守护进程池的代码片段:

import multiprocessing.pool


class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False


@daemon.setter
def daemon(self, value):
pass




class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess


# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)

由于 multiprocessing的当前实现已经被广泛地重构为基于上下文的实现,因此我们需要提供一个 NoDaemonContext类,它将 NoDaemonProcess作为属性。然后,NestablePool将使用该上下文,而不是默认上下文。

尽管如此,我还是应该警告,这种做法至少有两点需要注意:

  1. 它仍然取决于 multiprocessing包的实现细节,因此可能随时中断。
  2. multiprocessing使得使用非守护进程变得如此困难是有充分理由的,其中许多进程已经被解释为 给你。我认为最有说服力的是:

至于允许子线程派生自己使用的子线程 子进程冒着创建一支僵尸小军队的风险 如果父线程或子线程在 子进程完成并返回。

在 Python 3.8中,强 > concurrent.futures.ProcessPoolExecutor没有这个限制。它可以拥有一个完全没有问题的嵌套进程池:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time


def pid():
return current_process().pid


def _square(i):  # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square


def _sum_squares(i, j):  # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares


def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')


if __name__ == "__main__":
main()

上面的演示代码是用 Python 3.8测试的。

但是,ProcessPoolExecutor的一个限制是它没有 maxtasksperchild。如果你需要这个,考虑使用 马西米利亚诺的回答

提供者: 由 jfs 回答

我见过有人使用 celerymultiprocessing分支 台球(多处理池扩展)来处理这个问题,它允许守护进程产生子进程。本演练只是简单地将 multiprocessing模块替换为:

import billiard as multiprocessing

当错误看起来是假阳性时,这提供了一个变通方法。如同 詹姆斯注意到一样,这种情况也可能发生在来自守护进程的无意的 进口上。

例如,如果有以下简单代码,则可能无意中从 worker 导入 WORKER_POOL,从而导致错误。

import multiprocessing


WORKER_POOL = multiprocessing.Pool()

一个简单但可靠的变通方法是:

import multiprocessing
import multiprocessing.pool




class MyClass:


@property
def worker_pool(self) -> multiprocessing.pool.Pool:
# Ref: https://stackoverflow.com/a/63984747/
try:
return self._worker_pool  # type: ignore
except AttributeError:
# pylint: disable=protected-access
self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
return self.__class__._worker_pool  # type: ignore
# pylint: enable=protected-access

在上面的解决方案中,可以使用 MyClass.worker_pool而不出现错误。如果您认为这种方法可以改进,请告诉我。

下面介绍如何启动池,即使您已经处于守护进程中。这是在 python3.8.5中测试的

首先,定义 Undaemonize上下文管理器,它临时删除当前进程的守护进程状态。

class Undaemonize(object):
'''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    

Tested in python 3.8.5'''
def __init__(self):
self.p = multiprocessing.process.current_process()
if 'daemon' in self.p._config:
self.daemon_status_set = True
else:
self.daemon_status_set = False
self.daemon_status_value = self.p._config.get('daemon')
def __enter__(self):
if self.daemon_status_set:
del self.p._config['daemon']
def __exit__(self, type, value, traceback):
if self.daemon_status_set:
self.p._config['daemon'] = self.daemon_status_value

现在您可以像下面这样启动一个池,甚至可以从守护进程中启动:

with Undaemonize():
pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager

虽然这里的其他方法旨在创建一开始就不是守护进程的池,但是这种方法允许您启动一个池,即使您已经处于守护进程中。

自 Python 3.7版以来,我们可以创建非守护进程 ProcessPoolExecator

使用多处理时必须使用 if __name__ == "__main__":

from concurrent.futures import ProcessPoolExecutor as Pool


num_pool = 10
    

def main_pool(num):
print(num)
strings_write = (f'{num}-{i}' for i in range(num))
with Pool(num) as subp:
subp.map(sub_pool,strings_write)
return None




def sub_pool(x):
print(f'{x}')
return None




if __name__ == "__main__":
with Pool(num_pool) as p:
p.map(main_pool,list(range(1,num_pool+1)))