如何有效地迭代熊猫数据框架的连续块

我有一个很大的数据框架(几百万行)。

我希望能够对它执行 groupby 操作,但只是按任意连续(最好是大小相等)的行子集进行分组,而不是使用单个行的任何特定属性来决定它们到哪个组。

用例: 我想通过 IPython 中的并行映射对每一行应用一个函数。不管哪些行到哪个后端引擎,因为该函数一次只计算一行结果。(至少在概念上是这样; 实际上是向量化的。)

我想到了这样的东西:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)


# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]


# Process chunks in parallel
results = dview.map_sync(my_function, groups)

但这似乎非常冗长,并不能保证大小相同的块。特别是当索引是稀疏的或者非整数的或者其他什么的时候。

有什么更好的建议吗?

谢谢!

92171 次浏览

I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.

Here's a short example from that thread, which might do something like what you want:

import numpy as np
import pandas as pds


df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])


def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))


for i in chunker(df,5):
print i

Which gives you something like this:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

I hope that helps.

EDIT

In this case, I used this function with pool of processors in (approximately) this manner:

from multiprocessing import Pool


nprocs = 4


pool = Pool(nprocs)


for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()

I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.

In practice, you can't guarantee equal-sized chunks. The number of rows (N) might be prime, in which case you could only get equal-sized chunks at 1 or N. Because of this, real-world chunking typically uses a fixed size and allows for a smaller chunk at the end. I tend to pass an array to groupby. Starting from:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850


[15 rows x 5 columns]

where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257


[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850


[5 rows x 5 columns]

Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b] to ignore the index values and access data by position.

A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo

import blaze as bz
import pandas as pd


df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})


for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe

Use numpy's array_split():

import numpy as np
import pandas as pd


data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"

Chunks generator function for iterating pandas Dataframes and Series

A generator version of the chunk function is presented below. Moreover this version works with custom index of the pd.DataFrame or pd.Series (e.g. float type index)

    import numpy as np
import pandas as pd


df_sz = 14


df = pd.DataFrame(np.random.rand(df_sz,4),
index=np.linspace(0., 10., num=df_sz),
columns=['a', 'b', 'c', 'd']
)


def chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]


chunk_size = 6
for i in chunker(df, chunk_size):
print(i)


chnk = chunker(df, chunk_size)
print('\n', chnk)
print(next(chnk))
print(next(chnk))
print(next(chnk))

The output is

a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476
a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596
a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409


- generator object chunker at 0x7f503c9d0ba0


First "next()":
a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476


Second "next()":
a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596


Third "next()":
a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409
import pandas as pd


def batch(iterable, batch_number=10):
"""
split an iterable into mini batch with batch length of batch_number
supports batch of a pandas dataframe
usage:
for i in batch([1,2,3,4,5], batch_number=2):
print(i)
        

for idx, mini_data in enumerate(batch(df, batch_number=10)):
print(idx)
print(mini_data)
"""
l = len(iterable)


for idx in range(0, l, batch_number):
if isinstance(iterable, pd.DataFrame):
# dataframe can't split index label, should iter according index
yield iterable.iloc[idx:min(idx+batch_number, l)]
else:
yield iterable[idx:min(idx+batch_number, l)]

Your suggestion to use groupby is quite good, but you should rather use np.arange(len(dataframe)) // batch_size than dataframe.index, since the index can be non-integer and non-consequtive.

I've run some benchmarks on the answers given. The top-voted one is horribly slow. Please consider using the accepted solution:

data.groupby(np.arange(len(dataframe)) // batch_size)

benchmarks

Benchmark code:

import numpy as np
import pandas as pd
import time
from tqdm.auto import tqdm


#@markdown # Create a properly funcky `pd.DataFrame`
data = pd.DataFrame([
{
'x': np.random.randint(23515243),
'y': 364274*np.random.rand()-134562,
'z': ''.join(np.random.choice(list('`1234567890-=qwertyuiop[]\asdfghjkl;\'zxcvbnm,./~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:"ZXCVBNM<>?'), np.random.randint(54,89), replace=True)),
}
for _ in tqdm(range(22378))
])
data.index = ['a'] * len(data)


data = pd.concat([data] * 100)


batch_size = 64


times = []


t0 = time.time()
for chunk in np.array_split(data, (len(data) + batch_size - 1) // batch_size):
pass
times.append({'method': 'np.array_split', 'time': -t0 + time.time()})


t0 = time.time()
for _, chunk in data.groupby(np.arange(len(data)) // batch_size):
pass
times.append({'method': 'groupby', 'time': -t0 + time.time()})


def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
  

t0 = time.time()
for chunk in chunker(data, batch_size):
pass
times.append({'method': '[]-syntax', 'time': -t0 + time.time()})


# t0 = time.time()
# for chunk in bz.odo(data, target=bz.chunks(pd.DataFrame), chunksize=batch_size):
#   pass
# times.append({'method': 'bz.odo', 'time': -t0 + time.time()})




def chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]


t0 = time.time()
for i in chunker(data, batch_size):
pass
times.append({'method': '.iloc[]-syntax', 'time': -t0 + time.time()})


pd.DataFrame(times)

Another approach..

# .. load df ..


CHUNK_SIZE = 100000


for chunk_num in range(len(df) // CHUNK_SIZE + 1):
start_index = chunk_num*CHUNK_SIZE
end_index = min(chunk_num*CHUNK_SIZE + CHUNK_SIZE, len(df))
chunk = df[start_index:end_index]


# .. do calculaton on chunk here ..