多处理中的共享内存对象

假设我有一个内存中的大数组,我有一个函数 func,它接受这个大数组作为输入(连同一些其他参数)。具有不同参数的 func可以并行运行。例如:

def func(arr, param):
# do stuff to arr, param


# build array arr


pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多处理库,那么这个巨大的数组将被多次复制到不同的进程中。

有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改。

更复杂的是,如果 arr 不是一个数组,而是一个任意的 python 对象,有没有共享它的方法?

[编辑]

我读了答案,但我还是有点困惑。由于 fork ()是在写时复制的,所以在 Python 多处理库中产生新进程时,不应该调用任何额外的成本。但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np;
import time


def f(arr):
return len(arr)


t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;




pool = Pool(processes = 6)


t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便说一句,成本随着数组大小的增加而增加,因此我怀疑仍然存在与内存复制相关的开销) :

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

如果我们不复制数组,为什么会有这么大的开销?共享记忆救了我什么?

160170 次浏览

如果您使用的操作系统使用了即写即拷的 fork()语义(就像任何常见的 unix 一样) ,那么只要您不改变您的数据结构,那么所有的子进程都可以使用它,而不会占用额外的内存。你不需要做任何特别的事情(除了确保你不会改变对象)。

对于您的问题,可以做的最有效的事情 就是将您的数组打包成一个有效的数组结构(使用 numpyarray) ,将其放入共享内存中,用 multiprocessing.Array包装它,然后将其传递给您的函数。这个答案展示了如何做到这一点.

如果您想要一个 可以写共享对象,那么您将需要使用某种同步或锁定来包装它。multiprocessing提供了 这样做的两种方法: 使用共享内存(适用于简单值、数组或 ctype)或 Manager代理,其中一个进程保存内存,而管理器通过其他进程(甚至通过网络)对其进行访问。

Manager方法可以用于任意 Python 对象,但是比使用共享内存的方法要慢,因为对象需要序列化/反序列化并在进程之间发送。

有一个 丰富的并行处理库和 Python 中可用的方法multiprocessing是一个优秀的、全面的库,但是如果您有特殊的需求,也许其他方法之一会更好。

我遇到了同样的问题,于是编写了一个小小的共享内存实用程序类来解决这个问题。

我使用的是 multiprocessing.RawArray(无锁) ,而且对阵列的访问根本没有同步(无锁) ,小心不要射到自己的脚。

有了这个解决方案,我在四核 i7上的速度提高了大约3倍。

密码是这样的: 请随意使用和改进它,并请报告任何错误。

'''
Created on 14.05.2013


@author: martin
'''


import multiprocessing
import ctypes
import numpy as np


class SharedNumpyMemManagerError(Exception):
pass


'''
Singleton Pattern
'''
class SharedNumpyMemManager:


_initSize = 1024


_instance = None


def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance


def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize


def __createArray(self, dimensions, ctype=ctypes.c_double):


self.lock.acquire()


# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)


# next handle
self.__getNextFreeHdl()


# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))


# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)


# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)


# update cnt
self.cnt += 1


self.lock.release()


# return handle to the shared memory numpy array
return self.cur


def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')


def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()


def __getArray(self, i):
return self.shared_arrays[i]


@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance


@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)


@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)


@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)


# Init Singleton on module load
SharedNumpyMemManager.getInstance()


if __name__ == '__main__':


import timeit


N_PROC = 8
INNER_LOOP = 10000
N = 1000


def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i


class Parallel_Dummy_PF:


def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)


def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))


def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))


def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)


def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())


def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())


t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")


print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))

这是 的预期用例,是一个用于并行和分布式 Python 的库。在底层,它使用 阿帕奇之箭数据布局(零拷贝格式)序列化对象,并将它们存储在 共享内存对象存储器中,这样就可以由多个进程访问它们,而无需创建副本。

代码如下所示。

import numpy as np
import ray


ray.init()


@ray.remote
def func(array, param):
# Do stuff.
return 1


array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)


result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

如果不调用 ray.put,那么数组仍然会存储在共享内存中,但是每次调用 func都会这样做一次,这不是您想要的。

注意,这不仅适用于数组,也适用于 也适用于包含数组的对象,例如,字典将 int 映射到数组,如下所示。

通过在 IPython 中运行以下命令,可以比较 Ray 和 pickle 中的序列化性能。

import numpy as np
import pickle
import ray


ray.init()


x = {i: np.ones(10**7) for i in range(20)}


# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s


# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

使用 Ray 的序列化只比 pickle 稍微快一点,但是由于使用了共享内存,反序列化的速度要快1000倍(这个数字当然取决于对象)。

参见 Ray 的文件。你可以阅读更多关于 使用 Ray 和 Arrow 的快速序列化的内容。注意,我是 Ray 开发人员之一。

就像 Robert Nishihara 提到的那样,Apache Arrow 使这变得很容易,特别是使用了内存中的等离子体对象存储,这正是 Ray 构建的基础。

我制作 脑浆就是为了这个原因——在 Flask 应用程序中快速加载和重新加载大型对象。它是 Apache Arrow 可序列化对象的共享内存对象名称空间,包括由 pickle.dumps(...)生成的 pickle字节串。

ApacheRay 和等离子的关键区别在于,它为您保持对象 ID 的跟踪。本地运行的任何进程、线程或程序都可以通过从任何 Brain对象调用名称来共享变量的值。

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma


from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')


brain['a'] = [1]*10000


brain['a']
# >>> [1,1,1,1,...]