Python 中的多处理——在多个进程之间共享大型对象(例如熊猫数据框)

更准确地说,我使用的是 Python 多处理

from multiprocessing import Pool
p = Pool(15)


args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()

这种方法消耗了大量的内存; 几乎占用了我所有的 RAM (此时它会变得非常慢,因此使得多处理非常无用)。我假设问题在于 df是一个巨大的对象(一个大熊猫数据框架) ,它会被复制到每个进程中。我已经尝试使用 multiprocessing.Value共享数据帧而不进行复制

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...]

(就像在 Python 多处理共享内存中建议的那样) ,但是这给了我 TypeError: this type has no size(和 在 Python 进程之间共享一个复杂的对象?一样,不幸的是我不能理解它的答案)。

我第一次使用多重处理,也许我的理解还不够好。在这种情况下使用 multiprocessing.Value真的是正确的吗?我已经看到了其他的建议(如排队) ,但现在有点困惑。共享内存有哪些选项,在这种情况下哪一个最好?

46883 次浏览

Value的第一个参数是 Typecode _ or _ type,它被定义为:

Typecode _ or _ type 确定返回对象的类型: < strong > it is 使用的 ctype 类型或一个字符类型代码 数组模块。 * args 被传递给类型的构造函数。

重点是我的。所以,你不能简单地把一个熊猫数据框架放在一个 Value,它必须是 一种 ctype 类型

相反,您可以使用 multiprocessing.Manager为所有进程提供单例数据框实例。有几种不同的方式来结束在同一个地方-可能最简单的是把你的数据框放入经理的 Namespace

from multiprocessing import Manager


mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe


# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))

现在,任何传递给 Manager 引用的进程都可以访问您的数据框实例。或者只是传递一个对 Namespace的引用,它更干净。

有一件事我没有/将不会涉及到,那就是事件和信号——如果您的进程需要等待其他进程完成执行,那么您需要添加这一点。这里有一页与一些 Event的例子,也涵盖了一点更多的细节如何使用经理的 Namespace

(请注意,所有这些都没有涉及到 multiprocessing是否会带来实际的性能收益,这只是为您提供了探索这个问题的工具)

您可以通过创建 data _ handle 子进程在进程之间共享熊猫数据框架,而不需要任何内存开销。这个过程接收来自具有特定数据请求的其他子进程的调用(例如,一行、一个特定单元格、一个片等)从您的非常大的数据框架对象。只有 data _ handle 进程将数据框保存在内存中,而不像管理器(如 Namespace)那样将数据框复制到所有子进程。请参阅下面的工作示例。这可以转换为池。

需要一个进度条为此? 看到我的答案在这里: https://stackoverflow.com/a/55305714/11186769

import time
import Queue
import numpy as np
import pandas as pd
import multiprocessing
from random import randint


#==========================================================
# DATA HANDLER
#==========================================================


def data_handler( queue_c, queue_r, queue_d, n_processes ):


# Create a big dataframe
big_df = pd.DataFrame(np.random.randint(
0,100,size=(100, 4)), columns=list('ABCD'))


# Handle data requests
finished = 0
while finished < n_processes:


try:
# Get the index we sent in
idx = queue_c.get(False)


except Queue.Empty:
continue
else:
if idx == 'finished':
finished += 1
else:
try:
# Use the big_df here!
B_data = big_df.loc[ idx, 'B' ]


# Send back some data
queue_r.put(B_data)
except:
pass


# big_df may need to be deleted at the end.
#import gc; del big_df; gc.collect()


#==========================================================
# PROCESS DATA
#==========================================================


def process_data( queue_c, queue_r, queue_d):


data = []


# Save computer memory with a generator
generator = ( randint(0,x) for x in range(100) )


for g in generator:


"""
Lets make a request by sending
in the index of the data we want.
Keep in mind you may receive another
child processes return call, which is
fine if order isnt important.
"""


#print(g)


# Send an index value
queue_c.put(g)


# Handle the return call
while True:
try:
return_call = queue_r.get(False)
except Queue.Empty:
continue
else:
data.append(return_call)
break


queue_c.put('finished')
queue_d.put(data)


#==========================================================
# START MULTIPROCESSING
#==========================================================


def multiprocess( n_processes ):


combined  = []
processes = []


# Create queues
queue_data = multiprocessing.Queue()
queue_call = multiprocessing.Queue()
queue_receive = multiprocessing.Queue()


for process in range(n_processes):


if process == 0:


# Load your data_handler once here
p = multiprocessing.Process(target = data_handler,
args=(queue_call, queue_receive, queue_data, n_processes))
processes.append(p)
p.start()


p = multiprocessing.Process(target = process_data,
args=(queue_call, queue_receive, queue_data))
processes.append(p)
p.start()


for i in range(n_processes):
data_list = queue_data.get()
combined += data_list


for p in processes:
p.join()


# Your B values
print(combined)




if __name__ == "__main__":


multiprocess( n_processes = 4 )

可以使用 Array而不是 Value来存储数据帧。

下面的解决方案将 pandas数据帧转换为一个对象,该对象将其数据存储在共享内存中:

import numpy as np
import pandas as pd
import multiprocessing as mp
import ctypes


# the origingal dataframe is df, store the columns/dtypes pairs
df_dtypes_dict = dict(list(zip(df.columns, df.dtypes)))


# declare a shared Array with data from df
mparr = mp.Array(ctypes.c_double, df.values.reshape(-1))


# create a new df based on the shared array
df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape),
columns=df.columns).astype(df_dtypes_dict)

如果现在您跨进程共享 df_shared,则不会制作额外的副本。对于您的情况:

pool = mp.Pool(15)


def fun(config):
# df_shared is global to the script
df_shared.apply(config)  # whatever compute you do with df/config


config_list = [config1, config2]
res = p.map_async(fun, config_list)
p.close()
p.join()

如果您使用 潘达拉尔,这也是特别有用的,例如:

# this will not explode in memory
from pandarallel import pandarallel
pandarallel.initialize()
df_shared.parallel_apply(your_fun, axis=1)

注意: 使用这个解决方案,您最终会得到两个数据框架(df 和 df _ share) ,它们消耗两倍的内存,并且需要很长时间才能初始化。可以直接读取共享内存中的数据。

至少 Python 3.6支持以多处理方式存储熊猫数据帧:

import ctypes
import pandas as pd
from multiprocessing import Value


df = pd.DataFrame({'a': range(0,9),
'b': range(10,19),
'c': range(100,109)})


k = Value(ctypes.py_object)
k.value = df


print(k.value)

我非常惊讶于 Rel = “ nofollow noReferrer”> joblib 的并行(至少从1.0.1开始)已经支持与多进程工作者共享熊猫数据框架。至少后端是这样的。 我通过实验得出的一个结论是: 传递给函数的参数不应该包含任何大的 dict。如果他们这样做,把字典变成一个系列或数据框架。 每个工作者肯定会使用一些额外的内存,但是比主进程中所谓的“大”数据框的大小要小得多。计算马上开始在所有的工人。否则,joblib 将启动所有请求的工作线程,但是它们处于空闲状态,而对象将按顺序复制到每个工作线程中,这将花费很长时间。如果有人需要,我可以提供一个代码示例。我已经测试了只在只读模式下处理数据帧。文档中没有提到这个功能,但是对熊猫很有用。