熊猫多处理应用

我试图用熊猫数据帧多处理,这是分裂为8个部分的数据帧。应用一些功能到每个部分使用应用(每个部分处理在不同的过程)。

编辑: 这是我最终找到的解决办法:

import multiprocessing as mp
import pandas.util.testing as pdt


def process_apply(x):
# do some stuff to data here


def process(df):
res = df.apply(process_apply, axis=1)
return res


if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()


# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)


# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)


# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
63485 次浏览

由于我没有很多你的数据脚本,这是一个猜测,但我建议使用 p.map而不是 apply_async与回调。

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)

当我使用 multiprocessing.map()将函数应用到大型数据框架的不同块时,也遇到了同样的问题。

我只是想补充几点,以防其他人遇到和我一样的问题。

  1. 记得加 if __name__ == '__main__':
  2. .py文件中执行该文件,如果使用 ipython/jupyter notebook,则无法运行 multiprocessing(对于我的情况是这样的,尽管我没有线索)

一个基于作者解决方案的更通用的版本,允许在每个函数和数据框架上运行它:

from multiprocessing import  Pool
from functools import partial
import numpy as np


def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data


def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)


def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)

下面这句话:

df.apply(some_func, axis=1)

将成为:

parallelize_on_rows(df, some_func)

您可以使用 https://github.com/nalepae/pandarallel,如下例所示:

from pandarallel import pandarallel
from math import sin


pandarallel.initialize()


def func(x):
return sin(x**2)


df.parallel_apply(func, axis=1)


这对我很有效:

rows_iter = (row for _, row in df.iterrows())


with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)

安装简化使用并行映射的 皮克斯张力,并像下面这样使用:

from pyxtension.streams import stream


big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))

我最终使用 concurrent.futures.ProcessPoolExecutor.map代替了 multiprocessing.Pool.map,后者花费了316微秒处理一些序列长度为12秒的代码。

要使用所有(物理或逻辑)核,可以尝试使用 mapply作为 swifterpandarallel的替代品。

您可以在 init 上设置核心数量(以及分块行为) :

import pandas as pd
import mapply


mapply.init(n_workers=-1)


def process_apply(x):
# do some stuff to data here


def process(df):
# spawns a pathos.multiprocessing.ProcessPool if sensible
res = df.mapply(process_apply, axis=1)
return res

默认情况下(n_workers=-1) ,包使用系统上可用的所有物理 CPU。如果您的系统使用超线程(通常是显示的物理 CPU 数量的两倍) ,mapply将产生一个额外的工作者,以优先于系统上的其他进程处理多处理池。

您也可以使用所有的逻辑核(注意,这样的 CPU 绑定进程将争夺物理 CPU,这可能会降低您的操作速度) :

import multiprocessing
n_workers = multiprocessing.cpu_count()


# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

这是我发现的一些有用的代码。自动地将数据帧分割成你所有的 CPU 核心。

import pandas as pd
import numpy as np
import multiprocessing as mp


def parallelize_dataframe(df, func):
num_processes = mp.cpu_count()
df_split = np.array_split(df, num_processes)
with mp.Pool(num_processes) as p:
df = pd.concat(p.map(func, df_split))
return df


def parallelize_function(df):
df[column_output] = df[column_input].apply(example_function)
return df


def example_function(x):
x = x*2
return x

跑步:

df_output = parallelize_dataframe(df, parallelize_function)

Python 的 Starmap ()方法也可以简洁地将并行性引入到 apply用例中,在这些用例中,列值作为参数传递,例如:

df.apply(lambda row: my_func(row["col_1"], row["col_2"], ...), axis=1)

完整的例子和基准:

import time
from multiprocessing import Pool


import numpy as np
import pandas as pd




def mul(a, b, c):
# For illustration, could obviously be vectorized
return a * b * c


df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 3)), columns=list('ABC'))


# Standard apply
start = time.time()
df["mul"] = df.apply(lambda row: mul(row["A"], row["B"], row["C"]), axis=1)
print(f"Standard apply took {time.time() - start:.0f} seconds.")


# Starmap apply
start = time.time()
with Pool(10) as pool:
df["mul_pool"] = pool.starmap(mul, zip(df["A"], df["B"], df["C"]))
print(f"Starmap apply took {time.time() - start:.0f} seconds.")


pd.testing.assert_series_equal(df["mul"], df["mul_pool"], check_names=False)




>>> Standard apply took 72 seconds.
>>> Starmap apply took 5 seconds.

这样做的好处是不依赖于外部库,而且非常易读。

Tom Raz 的答案 https://stackoverflow.com/a/53135031/11847090忽略了一个边缘情况,即数据框架中的行数少于进程数

使用这种并行化方法代替

def parallelize(data, func, num_of_processes=8):
# check if the number of rows is less than the number of processes
# to avoid the following error
# ValueError: Expected a 1D array, got an array with shape
num_rows = len(data)
if num_rows == 0:
return None
elif num_rows < num_of_processes:
num_of_processes = num_rows
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data

而且我用暗袋多线程这个而不是这个自定义代码