在多处理进程之间共享大型只读数组

我有一个60GB 的 SciPy 阵列(矩阵) ,我必须共享之间的5 + multiprocessing Process对象。我看过 numpy-sharedmem,也读过 SciPy 列表上的 这个讨论。似乎有两种方法—— numpy-sharedmem和使用 multiprocessing.RawArray(),以及将 NumPy dtype映射到 ctype。现在,numpy-sharedmem似乎是一种可行的方法,但我还没有看到一个很好的参考例子。我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的。现在,由于它的大小,我想避免一个副本。它 Process0的正确方法是创建 Process1的数组副本作为 sharedmem数组,然后把它传递给 Process对象?有几个具体的问题:

  1. 将 sharedmem 句柄实际传递给子 Process()es 的最佳方法是什么?我是否需要一个队列来传递一个数组?用烟斗会更好吗?我是否可以将它作为参数传递给 Process()子类的 init (在这里我假设它是 pickle) ?

  2. 在我上面链接的讨论中,有提到 numpy-sharedmem不是64位安全的?我使用的结构肯定不是32位可寻址的。

  3. 对于 RawArray()的方法有折衷吗? 更慢,更烦?

  4. Numpy-sharedmem 方法是否需要任何 ctype-to-dtype 映射?

  5. 有人有这样做的开源代码的例子吗?我是一个非常实践的学习者,如果没有任何好的例子可以参考,很难让这个工作起作用。

如果有任何额外的信息,我可以提供帮助澄清其他人,请评论,我会添加。谢谢!

这需要在 Ubuntu Linux 和 也许吧 Mac OS 上运行,但是可移植性并不是一个大问题。

32468 次浏览

If you are on Linux (or any POSIX-compliant system), you can define this array as a global variable. multiprocessing is using fork() on Linux when it starts a new child process. A newly spawned child process automatically shares the memory with its parent as long as it does not change it (copy-on-write mechanism).

Since you are saying "I don't need any kind of locks, since the array (actually a matrix) will be read-only" taking advantage of this behavior would be a very simple and yet extremely efficient approach: all child processes will access the same data in physical memory when reading this large numpy array.

Don't hand your array to the Process() constructor, this will instruct multiprocessing to pickle the data to the child, which would be extremely inefficient or impossible in your case. On Linux, right after fork() the child is an exact copy of the parent using the same physical memory, so all you need to do is making sure that the Python variable 'containing' the matrix is accessible from within the target function that you hand over to Process(). This you can typically achieve with a 'global' variable.

Example code:

from multiprocessing import Process
from numpy import random




global_array = random.random(10**4)




def child():
print sum(global_array)




def main():
processes = [Process(target=child) for _ in xrange(10)]
for p in processes:
p.start()
for p in processes:
p.join()




if __name__ == "__main__":
main()

On Windows -- which does not support fork() -- multiprocessing is using the win32 API call CreateProcess. It creates an entirely new process from any given executable. That's why on Windows one is required to pickle data to the child if one needs data that has been created during runtime of the parent.

If your array is that big you can use numpy.memmap. For example, if you have an array stored in disk, say 'test.array', you can use simultaneous processes to access the data in it even in "writing" mode, but your case is simpler since you only need "reading" mode.

Creating the array:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

You can then fill this array in the same way you do with an ordinary array. For example:

a[:10,:100]=1.
a[10:,100:]=2.

The data is stored into disk when you delete the variable a.

Later on you can use multiple processes that will access the data in test.array:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))


# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Related answers:

You may be interested in a tiny piece of code I wrote: github.com/vmlaker/benchmark-sharedmem

The only file of interest is main.py. It's a benchmark of numpy-sharedmem -- the code simply passes arrays (either numpy or sharedmem) to spawned processes, via Pipe. The workers just call sum() on the data. I was only interested in comparing the data communication times between the two implementations.

I also wrote another, more complex code: github.com/vmlaker/sherlock.

Here I use the numpy-sharedmem module for real-time image processing with OpenCV -- the images are NumPy arrays, as per OpenCV's newer cv2 API. The images, actually references thereof, are shared between processes via the dictionary object created from multiprocessing.Manager (as opposed to using Queue or Pipe.) I'm getting great performance improvements when compared with using plain NumPy arrays.

Pipe vs. Queue:

In my experience, IPC with Pipe is faster than Queue. And that makes sense, since Queue adds locking to make it safe for multiple producers/consumers. Pipe doesn't. But if you only have two processes talking back-and-forth, it's safe to use Pipe, or, as the docs read:

... there is no risk of corruption from processes using different ends of the pipe at the same time.

sharedmem safety:

The main issue with sharedmem module is the possibility of memory leak upon ungraceful program exit. This is described in a lengthy discussion here. Although on Apr 10, 2011 Sturla mentions a fix to memory leak, I have still experienced leaks since then, using both repos, Sturla Molden's own on GitHub (github.com/sturlamolden/sharedmem-numpy) and Chris Lee-Messer's on Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem).

@Velimir Mlaker gave a great answer. I thought I could add some bits of comments and a tiny example.

(I couldn't find much documentation on sharedmem - these are the results of my own experiments.)

  1. Do you need to pass the handles when the subprocess is starting, or after it has started? If it's just the former, you can just use the target and args arguments for Process. This is potentially better than using a global variable.
  2. From the discussion page you linked, it appears that support for 64-bit Linux was added to sharedmem a while back, so it could be a non-issue.
  3. I don't know about this one.
  4. No. Refer to example below.

Example

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy


def do_work(data, start):
data[start] = 0;


def split_work(num):
n = 20
width  = n/num
shared = sharedmem.empty(n)
shared[:] = numpy.random.rand(1, n)[0]
print "values are %s" % shared


processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
for p in processes:
p.start()
for p in processes:
p.join()


print "values are %s" % shared
print "type is %s" % type(shared[0])


if __name__ == '__main__':
split_work(4)

Output

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
0.7292129   0.06063283]
type is <type 'numpy.float64'>

This related question might be useful.

You may also find it useful to take a look at the documentation for pyro as if you can partition your task appropriately you could use it to execute different sections on different machines as well as on different cores in the same machine.

Why not use multithreading? Resources of main process can be shared by its threads natively, thus multithreading is obviously a better way to share objects owned by the main process.

If you worry about python's GIL mechanism, maybe you can resort to the nogil of numba.