熊猫操作期间的进度指标

我经常在超过1500万行左右的数据帧上执行pandas操作,我希望能够访问特定操作的进度指示器。

是否存在基于文本的熊猫分裂-应用-组合操作进度指示器?

例如:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

其中feature_rollup是一个有点复杂的函数,它采用许多DF列,并通过各种方法创建新的用户列。对于大数据帧,这些操作可能需要一段时间,所以我想知道是否有可能在iPython笔记本中有基于文本的输出,以更新我的进度。

到目前为止,我已经尝试了Python的规范循环进度指示器,但它们没有以任何有意义的方式与pandas交互。

我希望在pandas库/文档中有一些我忽略了的东西,可以让人们了解分裂-应用-组合的进展。一个简单的实现可能会查看apply函数正在工作的数据帧子集的总数,并将进度报告为这些子集的完成分数。

这可能是需要添加到库中的东西吗?

176184 次浏览

你可以很容易地用装饰器做到这一点

from functools import wraps


def logging_decorator(func):


@wraps
def wrapper(*args, **kwargs):
wrapper.count += 1
print "The function I modify has been called {0} times(s).".format(
wrapper.count)
func(*args, **kwargs)
wrapper.count = 0
return wrapper


modified_function = logging_decorator(feature_rollup)

然后使用modified_function(当你想打印它时更改)

调整Jeff的答案(并将其作为一个可重用的函数)。

def logged_apply(g, func, *args, **kwargs):
step_percentage = 100. / len(g)
import sys
sys.stdout.write('apply progress:   0%')
sys.stdout.flush()


def logging_decorator(func):
def wrapper(*args, **kwargs):
progress = wrapper.count * step_percentage
sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
sys.stdout.flush()
wrapper.count += 1
return func(*args, **kwargs)
wrapper.count = 0
return wrapper


logged_func = logging_decorator(func)
res = g.apply(logged_func, *args, **kwargs)
sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
sys.stdout.flush()
return res

注意:应用进度百分比更新内联。如果你的函数stout,那么这将不起作用。

In [11]: g = df_users.groupby(['userID', 'requestDate'])


In [12]: f = feature_rollup


In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]:
...

像往常一样,你可以把它添加到你的groupby对象作为一个方法:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply


In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]:
...

正如评论中提到的,这不是core pandas感兴趣的功能。但是python允许你为许多pandas对象/方法创建这些(这样做会有相当多的工作…尽管你应该能够概括这种方法)。

由于流行的需求,我在tqdm (pip install "tqdm>=4.9.0")中添加了pandas支持。与其他答案不同,这个不会明显减慢熊猫的速度吗——这里有一个DataFrameGroupBy.progress_apply的例子:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks


# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()


df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

如果你对它的工作原理(以及如何为你自己的回调修改它)感兴趣,请参阅GitHub上的例子关于PyPI的完整文档,或导入模块并运行help(tqdm)。其他支持的函数包括mapapplymapaggregatetransform

编辑


要直接回答原来的问题,请替换为:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
< p > 注:tqdm <= v4.8: 对于低于4.8的tqdm版本,你必须执行:

而不是tqdm.pandas()
from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

我已经改变了杰夫的回答,以包括一个总数,这样你就可以跟踪进度和一个变量,只打印每X次迭代(这实际上提高了很多性能,如果“print_at”相当高)

def count_wrapper(func,total, print_at):


def wrapper(*args):
wrapper.count += 1
if wrapper.count % wrapper.print_at == 0:
clear_output()
sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
sys.stdout.flush()
return func(*args)
wrapper.count = 0
wrapper.total = total
wrapper.print_at = print_at


return wrapper

clear_output()函数来自

from IPython.core.display import clear_output

如果不是在IPython,安迪·海登的答案是没有它的

如果你像我一样需要在Jupyter/ipython笔记本中如何使用它的支持,这里是相关的文章的有用指南和源代码:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

注意_tqdm_notebook的import语句中的下划线。正如参考文章所提到的,开发正处于后期测试阶段。

UPDATE as 11/12/2021

我现在正在使用pandas==1.3.4tqdm==4.62.3,我不确定tqdm作者的哪个版本实现了这一更改,但上面的import语句已弃用。而不是使用:

 from tqdm.notebook import tqdm_notebook

< em > 截至2022年1月2日更新 现在可以简化.py和.ipynb文件的导入语句:

from tqdm.auto import tqdm
tqdm.pandas()

这应该可以在两种开发环境中正常工作,并且应该在pandas数据框架或其他值得tqdm支持的可迭代对象上工作。

< em > 截至2022年5月27日更新 如果你在SageMaker上使用jupyter笔记本,这个组合是有效的:

from tqdm import tqdm
from tqdm.gui import tqdm as tqdm_gui
tqdm.pandas(ncols=50)

对于希望在自定义并行熊猫应用代码上应用tqdm的任何人。

(多年来,我尝试了一些并行化的库,但我从未找到100%的并行化解决方案,主要是针对apply函数,而且我总是不得不回来查看我的“手动”代码。)

df_multi_core -这是你调用的。它接受:

  1. df对象
  2. 要调用的函数名
  3. 函数可以执行的列的子集(有助于减少时间/内存)
  4. 并行运行的作业数量(-1或忽略所有核心)
  5. df函数接受的任何其他kwargs(如“axis”)

_df_split -这是一个内部帮助函数,必须全局定位到正在运行的模块(Pool. properties)。地图是“位置依赖”),否则我会在内部定位它..

下面是我的要点的代码(我将在那里添加更多的熊猫函数测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial


def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))


def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)


try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)


pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results

Bellow是一个并行化apply的测试代码,使用tqdm "progress_apply"。

from time import time
from tqdm import tqdm
tqdm.pandas()


if __name__ == '__main__':
sep = '-' * 50


# tqdm progress_apply test
def apply_f(row):
return row['c1'] + 0.1
N = 1000000
np.random.seed(0)
df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})


print('testing pandas apply on {}\n{}'.format(df.shape, sep))
t1 = time()
res = df.progress_apply(apply_f, axis=1)
t2 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))


t3 = time()
# res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
t4 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
在输出中,你可以看到一个没有并行化运行的进度条,以及在并行化运行时每个核心的进度条。 有一个轻微的hickup,有时其余的核心一次出现,但即使这样,我认为它是有用的,因为你得到每个核心的进度统计(它/秒和总记录,为ex)

enter image description here

感谢@abcdaa提供这么棒的图书馆!

对于像mergeconcatjoin这样的操作,可以使用Dask显示进度条。

您可以将Pandas数据框架转换为Dask数据框架。然后你可以显示Dask进度条。

下面的代码显示了一个简单的例子:

创建和转换Pandas数据框架

import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd


n = 450000
maxa = 700


df1 = pd.DataFrame({'lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n)})
df2 = pd.DataFrame({'rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n)})


sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)

使用进度条合并

from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()


with TqdmCallback(desc="compute"):
sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()

同样的操作,Dask比Pandas更快,需要的资源更少:

  • 熊猫74.7 ms
  • Dask 20.2 ms

详情如下:

注1:我已经测试了这个解决方案:https://stackoverflow.com/a/56257514/3921758,但它不适合我。不度量合并操作。

注2:我已经检查了“公开请求”;for tqdm for Pandas:

这里的每个答案都使用了pandas.DataFrame.groupby。如果你想在pandas.Series.apply上创建一个没有groupby的进度条,你可以在jupyter-notebook中这样做:

from tqdm.notebook import tqdm
tqdm.pandas()




df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)

concat操作:

df = pd.concat(
[
get_data(f)
for f in tqdm(files, total=len(files))
]
)

TQDM只返回一个可迭代对象。