并行高效地将函数应用于分组的熊猫数据框架

我经常需要将一个函数应用到非常大的 DataFrame(混合数据类型)组中,并希望利用多个核的优势。

我可以从组中创建迭代器并使用多处理模块,但是这样做效率不高,因为必须对每个组和函数的结果进行 pickle,以便在进程之间进行消息传递。

有什么办法可以避免酸洗,甚至完全避免复制的 DataFrame?看起来多处理模块的共享内存函数仅限于 numpy阵列。还有别的选择吗?

7126 次浏览

From the comments above, it seems that this is planned for pandas some time (there's also an interesting-looking rosetta project which I just noticed).

However, until every parallel functionality is incorporated into pandas, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas directly using cython + OpenMP and C++.

Here's a short example of writing a parallel groupby-sum, whose use is something like this:

import pandas as pd
import para_group_demo


df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

and output is:

     sum
key
0      6
1      11
2      4

Note Doubtlessly, this simple example's functionality will eventually be part of pandas. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas.


To do this, I wrote a simple single-source-file extension whose code follows.

It starts with some imports and type definitions

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map


cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange


import pandas as pd


ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

The C++ unordered_map type is for summing by a single thread, and the vector is for summing by all threads.

Now to the function sum. It starts off with typed memory views for fast access:

def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values

The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:

    cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)

When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map:

    cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)

All that's left is to create a DataFrame and return the results:

    key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)


df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df