使熊猫数据框架应用()使用所有的核心?

到2017年8月为止,很不幸,Pandas Apply ()仍然限于使用单核,这意味着当您运行 df.apply(myfunc, axis=1)时,多核计算机将浪费大部分计算时间。

如何使用所有内核并行运行在一个数据框架上?

129590 次浏览

最简单的方法是使用 Dask 的 map _ Partitions:

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

语法是

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)


def myfunc(x,y,z, ...): return <whatever>


res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)

(我认为如果有16个内核,30是一个合适的分区数)。为了完整起见,我在我的机器(16个核心)上计算了差异:

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)


ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2']  )


t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

给一个 加速因子10从熊猫应用到黑暗应用到分区。当然,如果你有一个可以向量化的函数,你应该这样做——在这种情况下,函数(y*(x**2+1))是微不足道的向量化,但是有很多东西是不可能向量化的。

你可使用 swifter软件包:

pip install swifter

(注意,您可能希望在 viralenv 中使用这个命令,以避免与已安装的依赖项发生版本冲突。)

Swifter 作为熊猫的一个插件,允许你重用 apply功能:

import swifter


def some_function(data):
return data * 10


data['out'] = data['in'].swifter.apply(some_function)

它会自动找到最有效的方法来并行化函数,不管它是否向量化(如上面的例子)。

更多的例子 性能比较可以在 GitHub 上找到。请注意,该包正在积极开发中,因此 API 可能会发生变化。

还要注意,这个 不会自动工作用于字符串列。当使用字符串时,Swifter 会回退到一个“简单”的熊猫 apply,它不会是并行的。在这种情况下,即使强制它使用 dask也不会创建性能改进,您最好只是手动分割数据集和 使用 multiprocessing并行化

您可以尝试使用 pandarallel: 一个简单而有效的工具,可以在所有 CPU 上并行处理您的“熊猫”操作(在 Linux & macOS 上)

  • 并行化是有代价的(实例化新进程、通过共享内存发送数据等等) ,所以只有当并行化的计算量足够大时,并行化才是有效的。对于非常少量的数据,使用并行化并不总是值得的。
  • 应用的函数不应该是 lambda 函数。
from pandarallel import pandarallel
from math import sin


pandarallel.initialize()


# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)


# ALLOWED
def func(x):
return sin(x**2)


df.parallel_apply(func, axis=1)

https://github.com/nalepae/pandarallel

这里是一个示例的 sklearn 基础变压器,其中熊猫应用是并行的

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator


class ParllelTransformer(BaseEstimator, TransformerMixin):
def __init__(self,
n_jobs=1):
"""
n_jobs - parallel jobs to run
"""
self.variety = variety
self.user_abbrevs = user_abbrevs
self.n_jobs = n_jobs
def fit(self, X, y=None):
return self
def transform(self, X, *_):
X_copy = X.copy()
cores = mp.cpu_count()
partitions = 1


if self.n_jobs <= -1:
partitions = cores
elif self.n_jobs <= 0:
partitions = 1
else:
partitions = min(self.n_jobs, cores)


if partitions == 1:
# transform sequentially
return X_copy.apply(self._transform_one)


# splitting data into batches
data_split = np.array_split(X_copy, partitions)


pool = mp.Pool(cores)


# Here reduce function - concationation of transformed batches
data = pd.concat(
pool.map(self._preprocess_part, data_split)
)


pool.close()
pool.join()
return data
def _transform_part(self, df_part):
return df_part.apply(self._transform_one)
def _transform_one(self, line):
# some kind of transformations here
return line

更多信息见 https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

如果你想继续使用本地的 python:

import multiprocessing as mp


with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])

将函数 f以并行方式应用于数据帧 df的列 col

要使用所有(物理或逻辑)核,可以尝试使用 mapply作为 swifterpandarallel的替代品。

您可以在 init 上设置核心数量(以及分块行为) :

import pandas as pd
import mapply


mapply.init(n_workers=-1)


...


df.mapply(myfunc, axis=1)

默认情况下(n_workers=-1) ,包使用系统上所有可用的物理 CPU。如果您的系统使用超线程(通常是物理 CPU 数量的两倍) ,mapply将产生一个额外的工作者,以优先于系统上的其他进程处理多处理池。

根据你对 all your cores的定义,你也可以使用所有的逻辑核来代替(注意,像这样的 CPU 绑定进程会争夺物理 CPU,这可能会降低你的操作速度) :

import multiprocessing
n_workers = multiprocessing.cpu_count()


# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

因为问题是“ 如何使用所有内核并行运行在一个数据框架上?”,所以答案也可以是 modin。您可以并行运行所有内核,尽管实时性更差。

https://github.com/modin-project/modin。它运行在 daskray的顶部。他们说“ Modin 是一个 DataFrame,专为1MB 到1TB + 的数据集设计。”我试过: pip3 install "modin"[ray]"。Modin VS 熊猫是 -12秒,6个核心对6秒。

只是想给 戴斯克一个更新的答案

import dask.dataframe as dd


def your_func(row):
#do something
return row


ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

在我的10万张唱片里,没有戴斯克:

CPU 时间: 用户6分钟32秒,系统: 100毫秒,总计: 6分钟32秒 墙壁时间: 6分32秒

戴斯克:

CPU 时间: user 5.19 s,sys: 784 ms,total: 5.98 s 墙壁时间: 1分3秒

这里是另一个使用 Joblib 和 scikit-learn 中的一些 helper 代码的例子。轻量级(如果您已经有 scikit-learn) ,如果您更喜欢对它正在做什么进行更多的控制,那么这很好,因为 joblib 很容易被黑客攻击。

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples




def parallel_apply(df, func, n_jobs= -1, **kwargs):
""" Pandas apply in parallel using joblib.
Uses sklearn.utils to partition input evenly.
    

Args:
df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
func: Callable to apply
n_jobs: Desired number of workers. Default value -1 means use all available cores.
**kwargs: Any additional parameters will be supplied to the apply function
        

Returns:
Same as for normal Pandas DataFrame.apply()
        

"""
    

if effective_n_jobs(n_jobs) == 1:
return df.apply(func, **kwargs)
else:
ret = Parallel(n_jobs=n_jobs)(
delayed(type(df).apply)(df[s], func, **kwargs)
for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
return pd.concat(ret)

用法: result = parallel_apply(my_dataframe, my_func)

而不是

df["new"] = df["old"].map(fun)

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

对我来说,这是一个轻微的改善

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df["new"] = pool.map(fun, df["old"])

当你得到一个进度指示和自动批处理,如果工作是非常小的。

本机 Python 解决方案(带有 numpy) ,可以根据原始问题的要求应用于整个 DataFrame (不仅仅在单个列上)

import numpy as np
import multiprocessing as mp


dfs = np.array_split(df, 8000) # divide the dataframe as desired


def f_app(df):
return df.apply(myfunc, axis=1)


with mp.Pool(mp.cpu_count()) as pool:
res = pd.concat(pool.map(f_app, dfs))

如果您需要根据函数中的列名做一些操作,请注意 .apply函数可能会给您带来一些麻烦。在我的例子中,我需要根据列名使用 astype()函数更改列类型。这可能不是最有效的方法,但足以达到目的,并保持列名作为原始名称。

import multiprocessing as mp


def f(df):
""" the function that you want to apply to each column """
column_name = df.columns[0] # this is the same as the original column name
# do something what you need to do to that column
return df


# Here I just make a list of all the columns. If you don't use .to_frame()
# it will pass series type instead of a dataframe


dfs = [df[column].to_frame() for column in df.columns]
with mp.Pool(mp.cpu_num) as pool:
processed_df = pd.concat(pool.map(f, dfs), axis=1)