使用熊猫的“大数据”工作流程

几个月来,在学习熊猫的过程中,我一直在努力寻找这个问题的答案。我在日常工作中使用SAS,它对核心外支持很好。然而,由于许多其他原因,SAS作为一个软件很糟糕。

我希望有一天能用python和Pandas取代SAS,但我目前缺乏用于大型数据集的核心外工作流程。我说的不是需要分布式网络的“大数据”,而是太大而无法放入内存但又小到足以放入硬盘驱动器的文件。

我的第一个想法是使用HDFStore在磁盘上保存大型数据集,并仅将我需要的部分拉入数据帧进行分析。其他人提到MongoDB是一个更易于使用的替代方案。我的问题是:

实现以下目标的最佳实践工作流程是什么:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索数据以提供给熊猫数据结构
  3. 在操纵熊猫的碎片后更新数据库

真实世界的例子将不胜感激,特别是来自任何在“大数据”上使用熊猫的人。

编辑——一个我希望它如何工作的例子:

  1. 以迭代方式导入一个大型平面文件并将其存储在永久的磁盘数据库结构中。这些文件通常太大而无法放入内存。
  2. 为了使用Pandas,我想读取这些数据的子集(通常一次只有几列),这些数据可以放入内存。
  3. 我将通过对所选列执行各种操作来创建新列。
  4. 然后我必须将这些新列附加到数据库结构中。

我试图找到执行这些步骤的最佳实践方法。阅读有关熊猫和Pytable的链接似乎附加一个新列可能是一个问题。

编辑——具体回答Jeff的问题:

  1. 我正在构建消费者信用风险模型。数据类型包括电话、SSN和地址特征;财产价值;贬损信息,如犯罪记录、破产等……我每天使用的数据集平均有近1,000到2,000个混合数据类型的字段:数字和字符数据的连续、名义和序数变量。我很少追加行,但我确实执行许多创建新列的操作。
  2. 典型的操作涉及使用条件逻辑将几个列组合成一个新的复合列。例如,if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'。这些操作的结果是我的数据集中的每条记录都有一个新列。
  3. 最后,我想将这些新列附加到磁盘数据结构中。我会重复第2步,使用交叉表和描述性统计数据探索数据,试图找到有趣的、直观的关系来建模。
  4. 典型的项目文件通常约为1GB。文件被组织成这样一种方式,其中一行由一条消费者数据记录组成。每一行的每条记录都有相同数量的列数。情况总是如此。
  5. 在创建新列时,我很少按行进行子集。然而,在创建报告或生成描述性统计信息时,我很常见地按行进行子集。例如,我可能想为特定业务线创建一个简单的频率,比如零售信用卡。为此,除了我要报告的任何列之外,我只会选择业务线=零售的那些记录。然而,在创建新列时,我会提取所有数据行,只提取操作所需的列。
  6. 建模过程需要我分析每一列,寻找与一些结果变量的有趣关系,并创建描述这些关系的新复合列。我探索的列通常是以小集合完成的。例如,我会专注于一组大约20列的列,只是处理财产价值,并观察它们与违约贷款的关系。一旦探索了这些列并创建了新列,然后我就会转移到另一组列,比如大学教育,并重复这个过程。我正在做的是创建候选变量,解释我的数据和一些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程。

我很少会向数据集添加行。我几乎总是会创建新列(统计/机器学习术语中的变量或特征)。

345533 次浏览

我经常以这种方式使用数十千兆字节的数据例如,我在磁盘上有表,我通过查询读取,创建数据并追加回来。

值得阅读的文档在这个线程的后期,了解如何存储数据的几个建议。

影响数据存储方式的详细信息,例如:
尽可能多的细节,你可以;我可以帮助你开发一个结构。

  1. 数据大小,行,列的#,列的类型;您是否附加行,还是只是列?
  2. 典型的操作是什么样子。例如。对列进行查询以选择一堆行和特定列,然后执行操作(在内存中),创建新列,保存这些。
    (给出一个玩具的例子可以让我们提供更具体的建议。
  3. 处理完之后,你会怎么做?第2步是临时的,还是可重复的?
  4. 输入平面文件:有多少,粗略的总大小,以GB为单位。这些是如何组织的,例如通过记录?每个文件是否包含不同的字段,或者每个文件中是否有一些记录以及每个文件中的所有字段?
  5. 您是否曾经根据条件选择行(记录)的子集(例如选择字段A>5的行)?然后做一些事情,或者您只是选择字段A,B,C和所有记录(然后做一些事情)?
  6. 您是否“处理”所有列(分组),或者是否有一个很好的比例仅用于报告(例如,您希望保留数据,但在最终结果时间之前不需要拉入该列的显式性)?

解决方案

确保你有熊猫至少0.10.1安装。

逐块迭代文件多表查询

由于pytable被优化为按行操作(这就是你所查询的),我们将为每组字段创建一个表。这样可以很容易地选择一小群字段(这将与一个大表一起工作,但这样做更有效…我想我将来可能能够修复这个限制…无论如何,这更直观):
(以下是伪代码。)

import numpy as npimport pandas as pd
# create a storestore = pd.HDFStore('mystore.h5')
# this is the key to your storage:#    this maps your fields to a specific group, and defines#    what you want to have as data_columns.#    you might want to create a nice class wrapping this#    (as you will want to have this map and its inversion)group_map = dict(A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),B = dict(fields = ['field_10',......        ], dc = ['field_10']),.....REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
)
group_map_inverted = dict()for g, v in group_map.items():group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读取文件并创建存储(本质上是做append_to_multiple所做的事情):

for f in files:# read in the file, additional options may be necessary here# the chunksize is not strictly necessary, you may be able to slurp each# file into memory in which case just eliminate this part of the loop# (you can also change chunksize if necessary)for chunk in pd.read_table(f, chunksize=50000):# we are going to append to each table by group# we are not going to create indexes at this time# but we *ARE* going to create (some) data_columns
# figure out the field groupingsfor g, v in group_map.items():# create the frame for this groupframe = chunk.reindex(columns = v['fields'], copy = False)
# append itstore.append(g, frame, index=False, data_columns = v['dc'])

现在您已经在文件中拥有了所有的表(实际上,如果您愿意,您可以将它们存储在单独的文件中,您可能必须将文件名添加到group_map,但这可能不是必要的)。

这是您获取列并创建新列的方式:

frame = store.select(group_that_I_want)# you can optionally specify:# columns = a list of the columns IN THAT GROUP (if you wanted to#     select only say 3 out of the 20 columns in this sub-table)# and a where clause if you want a subset of the rows
# do calculations on this framenew_frame = cool_function_on_frame(frame)
# to 'add columns', create a new group (you probably want to# limit the columns in this new_group to be only NEW ones# (e.g. so you don't overlap from the other tables)# add this info to the group_mapstore.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当你准备好post_processing:

# This may be a bit tricky; and depends what you are actually doing.# I may need to modify this function to be a bit more general:report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,您实际上不需要定义任何data_columns;它们允许您根据列子选择行。例如。

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终的报表生成阶段,它们可能对您最感兴趣(本质上,一个数据列与其他列是隔离的,如果您定义了很多,这可能会在一定程度上影响效率)。

您可能还想:

  • 创建一个函数,它接受字段列表,在groups_map中查找组,然后选择这些组并连接结果,以便获得结果框架(这基本上是select_as_multiple所做的)。
  • 某些数据列上的索引(使行子集更快)。
  • 启用压缩。

让我知道当你有问题!

pymongo就是这种情况。我还在python中使用sql server、sqlite、HDF、ORM(SQLAlchemy)进行了原型设计。首先,pymongo是一个基于文档的数据库,所以每个人都是一个文档(属性dict)。许多人形成一个集合,你可以有许多集合(人、股票市场、收入)。

pd.dateframe->pymongo注意:我使用read_csv中的chunksize将其保持为5以10k记录(pymongo如果较大,则丢弃套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

查询:gt=大于…

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()返回一个迭代器,所以我通常使用ichunked来切入更小的迭代器。

连接怎么样,因为我通常会将10个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

然后(在我的情况下,有时我必须在“可合并”之前先aggaJoinDF

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

然后,您可以通过下面的更新方法将新信息写入主集合。(逻辑集合与物理数据源)。

collection.update({primarykey:foo},{key:change})

在较小的查找中,只需非规范化。例如,您在文档中有代码,您只需添加字段代码文本并在创建文档时进行dict查找。

现在你有了一个基于一个人的很好的数据集,你可以在每个案例上释放你的逻辑,并制作更多属性。最后,你可以读入熊猫你的3到内存最大关键指标,并进行枢轴/agg/数据探索。这对我来说适用于数字/大文本/类别/代码/浮点数/…

你也可以使用MongoDB内置的两种方法(MapReduce和聚合框架)。有关聚合框架的更多信息,请参阅此处,因为它似乎比MapReduce更容易,并且看起来很方便用于快速聚合工作。请注意,我不需要定义我的字段或关系,我可以将项目添加到文档中。在快速变化的numpy、熊猫、python工具集的当前状态下,MongoDB帮助我开始工作:)

我发现这个有点晚,但我处理类似的问题(抵押贷款预付模型)。我的解决方案是跳过熊猫HDFStore层并使用直接pytable。我将每列保存为最终文件中的单个HDF5数组。

我的基本工作流程是首先从数据库中获取一个CSV文件。我将其gzip,所以它不是那么大。然后我将其转换为面向行的HDF5文件,方法是在python中迭代它,将每行转换为真实数据类型,然后将其写入HDF5文件。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的HDF5文件“转置”为面向列的HDF5文件。

表转置看起来像:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):# Get a reference to the input data.tb = h_in.getNode(table_path)# Create the output group to hold the columns.grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))for col_name in tb.colnames:logger.debug("Processing %s", col_name)# Get the data.col_data = tb.col(col_name)# Create the output array.arr = h_out.createCArray(grp,col_name,tables.Atom.from_dtype(col_data.dtype),col_data.shape)# Store the data.arr[:] = col_datah_out.flush()

重新阅读它看起来像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):"""Read a transposed data set from a HDF5 file."""if isinstance(hdf5_path, tables.file.File):hf = hdf5_pathelse:hf = tables.openFile(hdf5_path)
grp = hf.getNode(group_path)if columns is None:data = [(child.name, child[:]) for child in grp]else:data = [(child.name, child[:]) for child in grp if child.name in columns]
# Convert any float32 columns to float64 for processing.for i in range(len(data)):name, vec = data[i]if vec.dtype == np.float32:data[i] = (name, vec.astype(np.float64))
if not isinstance(hdf5_path, tables.file.File):hf.close()return pd.DataFrame.from_items(data)

现在,我通常在具有大量内存的机器上运行它,因此我可能对内存使用不够小心。例如,默认情况下,加载操作读取整个数据集。

这通常对我有用,但它有点笨重,我不能使用花哨的pytable魔法。

编辑:与默认的记录数组pytable相比,这种方法的真正优势在于,我可以使用h5r将数据加载到R中,而h5r无法处理表。或者,至少,我无法让它加载异构表。

如果您的数据集在1到20GB之间,您应该获得一个具有48GB RAM的工作站。然后Pandas可以将整个数据集保存在RAM中。我知道这不是您在这里寻找的答案,但是在具有4GB RAM的笔记本电脑上进行科学计算是不合理的。

我认为上面的答案缺少一个我发现非常有用的简单方法。

当我有一个太大而无法加载到内存中的文件时,我将文件分成多个较小的文件(按行或按列)

示例:如果30天价值约30GB大小的交易数据,我将其分解为每天约1GB大小的文件。随后我分别处理每个文件并在最后汇总结果

最大的优点之一是它允许文件的并行处理(多个线程或进程)

另一个优点是文件操作(例如在示例中添加/删除日期)可以通过常规shell命令完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在很多场景中非常有用

如果您使用创建数据管道的简单路径,将其分解为多个较小的文件,请考虑Ruffus

我知道这是一个古老的线程,但我认为Blaze库值得一试。它是为这些类型的情况而构建的。

从文档:

Blaze将NumPy和Pandas的可用性扩展到分布式和核心外计算。Blaze提供了一个类似于NumPy ND-Array或Pandas DataFrame的接口,但将这些熟悉的接口映射到各种其他计算引擎,如Postgres或Spark。

编辑:顺便说一句,它由ContinuumIO和NumPy的作者Travis Oliphant支持。

还有一个变体

在Pandas中完成的许多操作也可以作为数据库查询(sql,mongo)完成

使用RDBMS或mongoDB允许您在DB查询中执行一些聚合(针对大数据进行了优化,并有效地使用缓存和索引)

稍后,您可以使用熊猫执行后处理。

这种方法的优点是,您可以获得处理大数据的数据库优化,同时仍然以高级声明性语法定义逻辑-并且不必处理决定在内存中做什么以及在核心之外做什么的细节。

尽管查询语言和Pandas不同,但将部分逻辑从一个转换到另一个通常并不复杂。

现在,在问题提出两年后,一个“核心外”熊猫等效:dask。它非常出色!虽然它不支持熊猫的所有功能,但你可以用它走得很远。更新:在过去的两年里,它一直在维护,并且有大量用户社区与Dask合作。

现在,在这个问题提出四年之后,在Vaex中又出现了另一个高性能的“核心外”熊猫。它“使用内存映射、零内存复制策略和懒惰计算来获得最佳性能(不浪费内存)。”它可以处理数十亿行的数据集,而不会将它们存储到内存中(甚至可以在次优硬件上进行分析)。

我最近遇到了一个类似的问题。我发现简单地读取数据块,并在将其写入相同的csv块时将其附加到相同的csv中效果很好。我的问题是根据另一个表中的信息添加日期列,使用某些列的值如下。这可能会帮助那些对dask和hdf5感到困惑但更熟悉像我这样的熊猫的人。

def addDateColumn():"""Adds time to the daily rainfall data. Reads the csv as chunks of 100krows at a time and outputs them, appending as needed, to a single csv.Uses the column of the raster names to get the date."""df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True,chunksize=100000) #read csv file as 100k chunks
'''Do some stuff'''
count = 1 #for indexing item in time listfor chunk in df: #for each 100k rowsnewtime = [] #empty list to append repeating times for different rowstoiterate = chunk[chunk.columns[2]] #ID of raster nums to base timewhile count <= toiterate.max():for i in toiterate:if i ==count:newtime.append(newyears[count])count+=1print "Finished", str(chunknum), "chunks"chunk["time"] = newtime #create new column in dataframe based on timeoutname = "CHIRPS_tanz_time2.csv"#append each output to same csv, using no headerchunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

我发现对大数据用例有帮助的一个技巧是通过将浮点精度降低到32位来减少数据量。这并不适用于所有情况,但在许多应用程序中,64位精度是多余的,节省2倍内存是值得的。更明显的一点是:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))>>> df.info()<class 'pandas.core.frame.DataFrame'>RangeIndex: 100000000 entries, 0 to 99999999Data columns (total 5 columns):...dtypes: float64(5)memory usage: 3.7 GB
>>> df.astype(np.float32).info()<class 'pandas.core.frame.DataFrame'>RangeIndex: 100000000 entries, 0 to 99999999Data columns (total 5 columns):...dtypes: float32(5)memory usage: 1.9 GB

正如其他人所指出的,几年后出现了一个“核心之外”的熊猫等价物:dask。虽然dask不是熊猫及其所有功能的直接替代品,但它因其以下几个原因而脱颖而出:

Dask是一个灵活的分析计算并行计算库,针对交互式计算工作负载的动态任务调度进行了优化“大数据”集合,如并行数组、数据帧和列表,将NumPy、Pandas或Python迭代器等常见接口扩展到大于内存或分布式环境,并从笔记本电脑扩展到集群。

Dask强调以下美德:

  • 熟悉:提供并行的NumPy数组和Pandas DataFrame对象
  • 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度接口。
  • 本机:在Pure Python中启用分布式计算,并访问PyData堆栈。
  • 快速:以低开销、低延迟和快速数值算法所需的最小序列化运行
  • 向上扩展:在具有1000个内核的集群上弹性运行向下扩展:在单个进程中在笔记本电脑上设置和运行的微不足道
  • 响应式:采用交互式计算设计,提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dddf = dd.read_csv('2015-*-*.csv')df.groupby(df.user_id).value.mean().compute()

替换了一些熊猫代码,如下所示:

import pandas as pddf = pd.read_csv('2015-01-01.csv')df.groupby(df.user_id).value.mean()

并且,特别值得注意的是,通过concurrent.futures接口为提交自定义任务提供了通用基础设施:

from dask.distributed import Clientclient = Client('scheduler:port')
futures = []for fn in filenames:future = client.submit(load, fn)futures.append(future)
summary = client.submit(summarize, futures)summary.result()

这里值得一提的是
它是一个分布式计算框架,以分布式方式为熊猫提供了自己的实现。

只需替换熊猫导入,代码应该可以正常工作:

# import pandas as pdimport ray.dataframe as pd
# use pd as usual

可以在这里阅读更多细节:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


更新:处理熊猫分发的部分,已被提取到莫丁项目中。

现在使用它的正确方法是:

# import pandas as pdimport modin.pandas as pd

我想指出Vaex包。

Vaex是一个用于懒惰核心外数据帧(类似于Pandas)的python库,用于可视化和探索大表数据集。它可以在每秒高达十亿(109)个对象/行的N维网格上计算均值、总和、计数、均方差等统计数据。可视化使用直方图、密度图和3d体积渲染完成,允许交互式探索大数据。Vaex使用内存映射、零内存复制策略和懒惰计算以获得最佳性能(不浪费内存)。

看一下留档:https://vaex.readthedocs.io/en/latest/API非常接近熊猫的API。

目前我正在“像”你一样工作,只是规模较小,这就是为什么我没有PoC来提出我的建议。

然而,我似乎发现成功使用泡菜作为缓存系统和外包执行各种功能到文件-从我的突击队/主文件执行这些文件;例如,我使用prepare_use.py转换对象类型,拆分数据集到测试,验证和预测数据集。

你的泡菜缓存是如何工作的?我使用字符串来访问动态创建的泡菜文件,这取决于传递了哪些参数和数据集(我尝试捕获并确定程序是否已经运行,使用数据集的形状,传递参数的字典)。尊重这些措施,我得到一个字符串,试图找到并读取一个泡菜文件,可以,如果找到,跳过流转时长,以跳转到执行我现在正在工作。

使用数据库我遇到了类似的问题,这就是为什么我发现使用这个解决方案的乐趣,但是-肯定有很多限制-例如由于冗余而存储巨大的泡菜集。将表从转换之前更新到转换之后可以通过适当的索引来完成-验证信息会打开另一本书(我尝试合并抓取的租金数据,基本上在2小时后停止使用数据库-因为我想在每次转换过程后跳回来)

我希望我的两分钱能以某种方式帮助你。

你好。

拼花文件格式非常适合您描述的用例。您可以使用pd.read_parquet(path_to_file, columns=["foo", "bar"])有效地读取特定的列子集

https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html