如何使用多处理器?

我对 python 中的 multiprocessing.Manager()有一点担心,下面是一个例子:

import multiprocessing


def f(ns):
ns.x *=10
ns.y *= 10


if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = 1
ns.y = 2


print 'before', ns
p = multiprocessing.Process(target=f, args=(ns,))
p.start()
p.join()
print 'after', ns

输出是:

before Namespace(x=1, y=2)
after Namespace(x=10, y=20)

到目前为止,它像我预期的那样工作,然后我像这样修改了代码:

import multiprocessing


def f(ns):
ns.x.append(10)
ns.y.append(10)


if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = []
ns.y = []


print 'before', ns
p = multiprocessing.Process(target=f, args=(ns,))
p.start()
p.join()
print 'after', ns

现在输出是:

before Namespace(x=[], y=[])
after Namespace(x=[], y=[])

这让我很困惑,为什么这些列表没有像我预期的那样改变。有人能帮我弄清楚发生了什么吗?

91144 次浏览

管理器代理对象无法传播对容器内的(非托管的)可变对象所做的更改。换句话说,如果您有一个 manager.list()对象,那么对托管列表本身的任何更改都将传播到所有其他进程。但是如果您有一个普通的 Python 列表 在里面该列表,任何对内部列表的更改都不会被传播,因为管理器没有办法检测到更改。

为了传播更改,您还必须对嵌套列表使用 manager.list()对象(需要 Python 3.6或更新版本) ,或者您需要直接修改 manager.list()对象(参见注意事项 对于 Python 3.5或更高版本的 manager.list)。

例如,考虑下面的代码及其输出:

import multiprocessing
import time


def f(ns, ls, di):
ns.x += 1
ns.y[0] += 1
ns_z = ns.z
ns_z[0] += 1
ns.z = ns_z


ls[0] += 1
ls[1][0] += 1 # unmanaged, not assigned back
ls_2 = ls[2]  # unmanaged...
ls_2[0] += 1
ls[2] = ls_2  # ... but assigned back
ls[3][0] += 1 # managed, direct manipulation


di[0] += 1
di[1][0] += 1 # unmanaged, not assigned back
di_2 = di[2]  # unmanaged...
di_2[0] += 1
di[2] = di_2  # ... but assigned back
di[3][0] += 1 # managed, direct manipulation


if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = 1
ns.y = [1]
ns.z = [1]
ls = manager.list([1, [1], [1], manager.list([1])])
di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})


print('before', ns, ls, ls[2], di, di[2], sep='\n')
p = multiprocessing.Process(target=f, args=(ns, ls, di))
p.start()
p.join()
print('after', ns, ls, ls[2], di, di[2], sep='\n')

产出:

before
Namespace(x=1, y=[1], z=[1])
[1, [1], [1], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[1]
{0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[1]
after
Namespace(x=2, y=[1], z=[2])
[2, [1], [2], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[2]
{0: 2, 1: [1], 2: [2], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[2]

正如您所看到的,当一个新值被直接分配给托管容器时,它会发生变化; 当它被分配给托管容器中的一个可变容器时,它不会发生变化; 但是如果可变容器被 重新分配分配给托管容器,它会再次发生变化。使用嵌套的托管容器也可以工作,可以直接检测更改,而不必将更改分配回父容器。

ns是 NamespaceProxy 实例。这些对象具有特殊的 __getattr____setattr____delattr__方法,这些方法允许跨进程共享值。 为了在更改值时利用这种机制,必须触发 __setattr__

ns.x.append(10)

导致调用 ns.__getattr__以检索 ns.x,但是不导致调用 ns.__setattr__

要解决这个问题,必须使用 ns.x = ...

def f(ns):
tmp = ns.x     # retrieve the shared value
tmp.append(10)
ns.x = tmp     # set the shared value