Python 在进程之间共享锁

我尝试使用一个部分函数,以便 pool.map ()可以针对具有多个参数的函数(在本例中是 Lock ()对象)。

下面是示例代码(摘自我之前提出的一个问题的答案) :

from functools import partial


def target(lock, iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()


def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()

然而,当我运行这段代码时,我得到了这个错误:

Runtime Error: Lock objects should only be shared between processes through inheritance.

我在这里遗漏了什么? 我如何在子进程之间共享锁?

73681 次浏览

不能将普通的 multiprocessing.Lock对象传递给 Pool方法,因为它们不能被 pickle。有两种方法可以解决这个问题。一种是创建 Manager()并传递一个 Manager.Lock():

def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
m = multiprocessing.Manager()
l = m.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()

不过,这有点重量级; 使用 Manager需要产生另一个进程来承载 Manager服务器。所有对 acquire/release的调用都必须通过 IPC 发送到该服务器。

另一种选择是在 Pool 创建时使用 initializer kwarg 传递常规的 multiprocessing.Lock()。这将使您的锁实例在所有子工作器中都是全局的:

def target(iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def init(l):
global lock
lock = l


def main():
iterable = [1, 2, 3, 4, 5]
l = multiprocessing.Lock()
pool = multiprocessing.Pool(initializer=init, initargs=(l,))
pool.map(target, iterable)
pool.close()
pool.join()

第二种解决方案的副作用是不再需要 partial

这里有一个版本(使用 Barrier而不是 Lock,但是你可以理解) ,它也可以在 Windows 上运行(在 Windows 上,缺少的 fork会造成额外的麻烦) :

import multiprocessing as mp


def procs(uid_barrier):
uid, barrier = uid_barrier
print(uid, 'waiting')
barrier.wait()
print(uid, 'past barrier')


def main():
N_PROCS = 10
with mp.Manager() as man:
barrier = man.Barrier(N_PROCS)
with mp.Pool(N_PROCS) as p:
p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))


if __name__ == '__main__':
mp.freeze_support()
main()