多处理: 进程之间共享一个大的只读对象?

通过 多重处理产生的子进程是否共享程序早期创建的对象?

我有以下设置:

do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here


if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))

我将一些大对象加载到内存中,然后创建一个工作池,需要利用这个大对象。大对象是只读访问的,我不需要在进程之间传递对它的修改。

我的问题是: 大对象是加载到共享内存中(就像我在 unix/c 中生成一个进程那样) ,还是每个进程都加载自己的大对象副本?

更新: 进一步澄清—— big _ lookup _ object 是一个共享查找对象。我不需要把它们分开处理。我需要一份复印件。我需要分割它的工作是读取大量其他大型文件,并根据查找对象查找这些大型文件中的项目。

进一步更新: 数据库是一个很好的解决方案,memcached 可能是一个更好的解决方案,磁盘上的文件(搁置或数据库)可能更好。在这个问题中,我对内存解决方案特别感兴趣。对于最终的解决方案,我将使用 hadoop,但我想看看我是否可以有一个本地内存版本以及。

64259 次浏览

通过前面程序中创建的多处理共享对象生成的子进程吗?

Python < 3.8,Python ≥3.8是的不适用。

进程有独立的内存空间。

解决方案1

为了最好地利用一个有很多工人的大型建筑物,请这样做。

  1. 将每个 worker 写成一个“过滤器”——从 stdin读取中间结果,执行工作,在 stdout上写入中间结果。

  2. 将所有工人连接成一条管道:

    process1 <source | process2 | process3 | ... | processn >result
    

每个进程读取、工作和写入。

这非常有效,因为所有进程都是并发运行的。写入和读取直接通过进程之间的共享缓冲区。


解决方案2

在某些情况下,您有一个更复杂的结构-通常是 散开结构。在本例中,您有一个具有多个子级的父级。

  1. 父级打开源数据。父级分叉许多子级。

  2. 父级读取源代码,将源代码的一部分分发给每个并发运行的子级。

  3. 当父文件到达文件末尾时,关闭管道。子文件到达文件末尾并正常完成。

儿童部分是愉快的写作,因为每个孩子只是读 sys.stdin

父母有一点奇特的脚步,在产卵所有的孩子和保持适当的管道,但它不是太坏。

Fan-in 是相反的结构。许多独立运行的进程需要将它们的输入交织到一个公共进程中。收集器不容易编写,因为它必须从多个源读取。

通常使用 select模块读取许多命名管道,以查看哪些管道具有挂起的输入。


解决方案3

共享查找是数据库的定义。

解决方案3A-加载数据库。让工人处理数据库中的数据。

解决方案3B-使用 沃克泽格(或类似的)创建一个非常简单的服务器,以提供响应 HTTP GET 的 WSGI 应用程序,这样工作人员就可以查询服务器。


解决方案4

共享文件系统对象。UnixOS 提供共享内存对象。这些只是映射到内存的文件,以便交换 I/O 而不是更多的约定缓冲读取。

您可以通过几种方式从 Python 上下文中实现这一点

  1. 编写一个启动程序,(1)将原来的巨大对象分解成更小的对象,(2)启动工作器,每个工作器有一个更小的对象。可以对较小的对象 pickle Python 对象,以节省一点点文件读取时间。

  2. 编写一个启动程序,(1)读取原始的巨型对象,并使用 seek操作编写一个页面结构化、字节编码的文件,以确保用简单的寻找就可以轻松找到各个部分。这就是数据库引擎所做的——将数据分割成页面,通过 seek使每个页面易于定位。

产生能够访问这个大型页面结构文件的工作人员。每个工人都可以找到相关的部分,在那里做他们的工作。

不同的进程有不同的地址空间。就像运行不同的解释器实例一样。这就是 IPC (进程间通信)的作用。

可以为此使用队列或管道。如果希望以后通过网络分发进程,还可以使用 rpc over tcp。

Http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

如果你在 Unix 下运行,由于 叉子是怎么用的的原因,它们可能共享同一个对象(也就是说,子进程有独立的内存,但它是写上拷贝的,所以只要没有人修改它,它就可能被共享)。我尝试了以下方法:

import multiprocessing


x = 23


def printx(y):
print x, id(x)
print y


if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(printx, (1,2,3,4))

得到如下输出:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

当然,这并不意味着没有拷贝 证明,但是在您的情况下,您应该能够通过查看 ps的输出来验证这一点,以查看每个子进程使用了多少实际内存。

S. Lott 是正确的,Python 的多处理快捷方式有效地为您提供了一个单独的、重复的内存块。

在大多数 * nix 系统上,使用对 os.fork()的低级调用实际上会提供写上复制内存,这可能正是您所想的。AFAIK,理论上,在最简单的程序中,你可以从数据中读取数据而不需要重复。

然而,在 Python 解释器中事情并不那么简单。对象数据和元数据存储在同一个内存段中,所以即使对象从未更改,对象的引用计数器之类的东西被增加也会导致内存写入,因此是一个副本。几乎任何比“ print‘ hello’”执行更多操作的 Python 程序都会导致引用计数增加,因此您可能永远不会意识到写时复制的好处。

即使有人成功地破解了 Python 中的共享内存解决方案,尝试跨进程协调垃圾收集也可能是相当痛苦的。

与多处理本身没有直接关系,但是从您的示例来看,似乎您可以只使用 搁置模块或类似的东西。“ big _ lookup _ object”真的必须完全在内存中吗?

通过多处理共享对象产生的子进程是否在程序中创建了 早些时候

看情况。对于全局只读变量,通常可以这样考虑(除了消耗的内存之外) ,否则不应该这样做。

多重处理的文档说:

Better to inherit than pickle/unpickle

中的许多类型 多重处理需要是可腌制的 以便子进程可以使用它们。 然而,一般应该避免 将共享对象发送给其他 使用管道或队列的进程。 相反,你应该安排这个项目 这样一个过程,需要访问 在别处创造的共享资源 可以从祖先那里继承 程序。

Explicitly pass resources to child processes

在 Unix 上,子进程可以使用 中创建的共享资源的 使用全局 资源。然而,最好是 将对象作为参数传递给 子进程的构造函数。

除了制作密码 (可能)与 Windows 兼容 这也确保了只要 子进程仍然是活的 对象将不会被垃圾收集 在父进程中。这可能是 如果释放了某些资源,则 当对象被垃圾收集时 在父进程中。

Global variables

请记住,如果代码在 子进程试图访问全局 变量,然后它看到的值(如果 (any)可能与值不相同 在父进程中 调用了 Process.start ()。

例子

Windows (单 CPU) :

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool


x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object


def printx(y):
global x
if y == 3:
x = -x
z.append(y)
print os.getpid(), x, id(x), z, id(z)
print y
if len(sys.argv) == 2 and sys.argv[1] == "sleep":
time.sleep(.1) # should make more apparant the effect


if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(printx, (1,2,3,4))

sleep:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有 sleep:

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

对于 Linux/Unix/MacOS 平台,forkmap 是一种快捷而简单的解决方案。

不,但是您可以将数据作为子进程加载,并允许它与其他子进程共享其数据。见下文。

import time
import multiprocessing


def load_data( queue_load, n_processes )


... load data here into some_variable


"""
Store multiple copies of the data into
the data queue. There needs to be enough
copies available for each process to access.
"""


for i in range(n_processes):
queue_load.put(some_variable)




def work_with_data( queue_data, queue_load ):


# Wait for load_data() to complete
while queue_load.empty():
time.sleep(1)


some_variable = queue_load.get()


"""
! Tuples can also be used here
if you have multiple data files
you wish to keep seperate.
a,b = queue_load.get()
"""


... do some stuff, resulting in new_data


# store it in the queue
queue_data.put(new_data)




def start_multiprocess():


n_processes = 5


processes = []
stored_data = []


# Create two Queues
queue_load = multiprocessing.Queue()
queue_data = multiprocessing.Queue()


for i in range(n_processes):


if i == 0:
# Your big data file will be loaded here...
p = multiprocessing.Process(target = load_data,
args=(queue_load, n_processes))


processes.append(p)
p.start()


# ... and then it will be used here with each process
p = multiprocessing.Process(target = work_with_data,
args=(queue_data, queue_load))


processes.append(p)
p.start()


for i in range(n_processes)
new_data = queue_data.get()
stored_data.append(new_data)


for p in processes:
p.join()
print(processes)