熊猫后并行应用

我使用 rosetta.parallel.pandas_easyapply并行化到 groupby之后,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

然而,有人知道如何并行处理一个返回 DataFrame 的函数吗?正如预期的那样,rosetta代码失败。

def tmpFunc(df):
df['c'] = df.a + df.b
return df


df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
61717 次浏览

我有一个黑客,我用熊猫得到并行化。我将数据框架分成几个块,将每个块放入一个列表的元素中,然后使用 ipython 的并行位对数据框架列表进行并行应用。然后使用熊猫 concat函数将列表重新组合在一起。

然而,这并不普遍适用。它对我很有用,因为我想要应用到数据框架的每个块的函数大约需要一分钟。把我的数据分离和整理也不需要那么长时间。所以这显然是个软件包。说到这里,举个例子。我使用的是 Ipython 笔记本,所以你会在我的代码中看到 %%time的魔力:

## make some example data
import pandas as pd


np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')

对于这个示例,我将基于上面的 groupby 创建“块”,但是这并不一定是数据分块的方式。虽然这是很常见的模式。

dflist = []
for name, group in grouped:
dflist.append(group)

设置平行位

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

写一个愚蠢的函数来应用于我们的数据

def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf

现在让我们运行串行代码,然后并行。 系列第一:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

现在平行

%%time
parallel_list = lview.map(myFunc, dflist)


CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

然后只需要几毫秒就可以将它们合并回一个数据帧

%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

我在我的 MacBook 上运行6个 IPython 引擎,但是你可以看到它把执行时间从14秒降到了2秒。

对于真正长时间运行的随机模拟,我可以通过使用 星团启动集群来使用 AWS 后端。然而,大多数时候,我只是在 MBP 上的8个 CPU 之间进行并行处理。

这似乎是有效的,虽然它确实应该建立在熊猫

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing


def tmpFunc(df):
df['c'] = df.a + df.b
return df


def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)


if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)


print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)


print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)

伊万的回答很棒,但看起来似乎可以稍微简化一下,同时也消除了对 joblib 的依赖:

from multiprocessing import Pool, cpu_count


def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)

顺便说一下: 这不能代替 任何 groupby.application () ,但是它会覆盖典型的情况: 例如,它应该覆盖情况2和3 在文件中,而你应该通过给最终的 pandas.concat()调用提供参数 axis=1来获得情况1的行为。

编辑: 文档更改; 旧版本可以找到 给你,无论如何,我正在复制下面的三个例子。

case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels


case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together


case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together

伴随 JD Long 的回答的一个简短评论。我发现,如果组的数量非常大(比如说数十万) ,而你的 application 函数正在做一些相当简单和快速的事情,那么将你的数据框拆分成块,并将每个块分配给一个工作者来执行一个 groupby-application (串行)可能比执行一个并行 groupby-application 并让工作者读取一个包含多个组的队列要快得多。例如:

import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed


nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})

我们的数据框架是这样的:

    a
0   3425
1   1016
2   8141
3   9263
4   8018

请注意,列“ a”有许多组(想想客户 ID) :

len(df.a.unique())
15000

对我们的团队进行操作的一个功能:

def f1(group):
time.sleep(0.0001)
return group

建立一个游泳池:

ppe = ProcessPoolExecutor(12)
futures = []
results = []

做一个并行的分组-应用:

%%time


for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)


for future in as_completed(futures):
r = future.result()
results.append(r)


df_output = pd.concat(results)
del ppe


CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s

现在我们添加一列,将 df 分成更少的组:

df['b'] = np.random.randint(0, 12, nrows)

现在只有12个团体,而不是1.5万个团体:

len(df.b.unique())
12

我们将对 df 进行分区,并对每个块执行 groupby-application 操作。

ppe = ProcessPoolExecutor(12)

包装乐趣:

def f2(df):
df.groupby('a').apply(f1)
return df

发送要串行操作的每个数据块:

%%time


for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)


for future in as_completed(futures):
r = future.result()
results.append(r)


df_output = pd.concat(results)


CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s

请注意,每个组花费的时间并没有改变。更确切地说,改变的是工作人员从中读取的队列的长度。我怀疑正在发生的事情是,工作者不能同时访问共享内存,并且不断返回来读取队列,因此彼此踩到了对方的脚趾。对于需要操作的较大块,工作者返回的频率较低,因此这个问题得到了改善,整体执行速度也更快。

就个人而言,我建议使用黑暗,每 这根线

正如@chrisb 指出的,在 python 中使用熊猫进行多处理可能会产生不必要的开销。它也可以执行 没有,以及多线程,甚至作为一个单一的线程。

Dask 是专门为多处理创建的。

编辑: 为了在熊猫 groupby上获得更好的计算性能,您可以使用 笨蛋在运行时将代码编译成 C 代码并以 C 速运行。如果在 groupby之后应用的函数是纯 numpy计算,那么它将非常快(比这种并行化要快得多)。

可以使用 multiprocessingjoblib来实现并行化。但是,如果是 组的数量很大,每个组 DataFrame 都很大,那么运行时间可能会更糟,因为需要多次将这些组转移到 CPU 中。为了减少开销,我们可以首先将数据划分为大块,然后将计算并行化在这些块上。

例如,假设您正在处理股票数据,您需要按照股票的代码对股票进行分组,然后计算一些统计数据。您可以首先根据代码的第一个字符(大块)进行分组,然后在这个虚拟组中执行以下操作:

import pandas as pd
from joblib import Parallel, delayed


def group_func(dummy_group):
# Do something to the group just like doing to the original dataframe.
#     Example: calculate daily return.
res = []
for _, g in dummy_group.groupby('code'):
g['daily_return']  = g.close / g.close.shift(1)
res.append(g)
return pd.concat(res)


stock_data = stock_data.assign(dummy=stock_data['code'].str[0])


Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))

人们开始使用 bodo 来实现并行。它是可用于并行化 python 的最快引擎,因为它使用 MPI 编译代码。它的新编译器使它比 Dask、 Ray、多处理器、 Pandarel 等更快。在这篇博客文章中阅读 bodo vs Dask,看看特拉维斯在他的 LinkedIn 上对 bodo 有什么看法!他是 Anaconda 的创始人: 引用“ Bodo 是真正的交易”

Https://bodo.ai/blog/performance-and-cost-of-bodo-vs-spark-dask-ray

Https://www.linkedin.com/posts/teoliphant_performance-and-cost-evaluation-of-bodo-vs-activity-6873290539773632512-y5iz/

至于如何在 bodo 中使用 groupby,这里我编写了一个示例代码:

#install bodo through your terminal


conda create -n Bodo python=3.9 -c conda-forge
conda activate Bodo
conda install bodo -c bodo.ai -c conda-forge

下面是 groupby 的代码示例:

import time
import pandas as pd
import bodo




@bodo.jit
def read_data():
""" a dataframe with 2 columns, headers: 'A', 'B'
or you can just create a data frame instead of reading it from flat file
"""
return pd.read_parquet("your_input_data.pq")




@bodo.jit
def data_groupby(input_df):
t_1 = time.time()
df2 = input_df.groupby("A", as_index=False).sum()
t_2 = time.time()
print("Compute time: {:.2f}".format(t_2-t_1))
return df2, t_2-t_1




if __name__ == "__main__":
df = read_data()
t0 = time.time()
output, compute_time = data_groupby(df)
t2 = time.time()
total_time = t2 - t0
if bodo.get_rank() == 0:
print("Compilation time: {:.2f}".format(total_time - compute_time))
print("Total time second call: {:.2f}".format(total_time))

最后通过终端运行 mpiexec。- n 确定要运行它的核心(CPU)数量。

mpiexec -n 4 python filename.py

免责声明: 我是 swifter的所有者和主要贡献者/维护者

Swift 是我4年前创建的一个 python 包,作为一个包,它以最快的可用方式有效地将任何功能应用于熊猫数据框架或系列。到目前为止,swifter拥有超过2000颗 GitHub 星,每月250000次下载,95% 的代码覆盖率。

在1.3.2版本中,swifter提供了一个简单的界面,可以通过 application 实现性能并行化分组:

df.swifter.groupby(df.index).apply(tmpFunc)

我还创建了 业绩基准,展示了 Swift 的性能改进,这里复制了一个关键的视觉效果: 应用性能基准

你可以通过 pip 轻松安装更快的软件(具有 groupby 应用功能) :

pip install swifter[groupby]>=1.3.2

或者通过 conda:

conda install -c conda-forge swifter>=1.3.2 ray>=1.0.0

有关详情,请参阅 自述文件