我如何阅读一个大的csv文件与熊猫?

我试图读取一个大的csv文件(aprox。6 GB)在熊猫和我得到一个内存错误:

MemoryError                               Traceback (most recent call last)
<ipython-input-58-67a72687871b> in <module>()
----> 1 data=pd.read_csv('aphro.csv',sep=';')


...


MemoryError:

有什么帮助吗?

438457 次浏览

错误显示机器没有足够的内存来读取整个 CSV一次转换成一个数据帧。假设您不需要整个数据集 内存中,避免这个问题的一种方法是处理CSV在 chunks(通过指定chunksize参数):

. chunk
chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
process(chunk)
chunksize参数指定每个块的行数。 (当然,最后一个块包含的行数可能少于chunksize)


熊猫= 1.2

read_csvchunksize返回一个上下文管理器,像这样使用:

chunksize = 10 ** 6
with pd.read_csv(filename, chunksize=chunksize) as reader:
for chunk in reader:
process(chunk)

看到GH38225

我是这样说的:

chunks=pd.read_table('aphro.csv',chunksize=1000000,sep=';',\
names=['lat','long','rf','date','slno'],index_col='slno',\
header=None,parse_dates=['date'])


df=pd.DataFrame()
%time df=pd.concat(chunk.groupby(['lat','long',chunk['date'].map(lambda x: x.year)])['rf'].agg(['sum']) for chunk in chunks)

你可以尝试sframe,它和pandas有相同的语法,但是允许你操作比你的RAM大的文件。

函数read_csv和read_table几乎是一样的。但在程序中使用read_table函数时,必须分配分隔符“,”。

def get_from_action_data(fname, chunk_size=100000):
reader = pd.read_csv(fname, header=0, iterator=True)
chunks = []
loop = True
while loop:
try:
chunk = reader.get_chunk(chunk_size)[["user_id", "type"]]
chunks.append(chunk)
except StopIteration:
loop = False
print("Iteration is stopped")


df_ac = pd.concat(chunks, ignore_index=True)

如果您使用pandas将大文件读入块,然后逐行yield,这是我所做的

import pandas as pd


def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
for chunk in pd.read_csv(filename,delimiter=',', iterator=True, chunksize=chunk_size, parse_dates=[1] ):
yield (chunk)


def _generator( filename, header=False,chunk_size = 10 ** 5):
chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
for row in chunk:
yield row


if __name__ == "__main__":
filename = r'file.csv'
generator = generator(filename=filename)
while True:
print(next(generator))

分块不应该总是解决这个问题的第一步。

  1. < p > 文件大是因为重复的非数字数据或不需要的列吗?

    如果是这样,你有时可以看到通过按列分类阅读和通过pd.read_csv usecols参数选择所需的列节省了大量内存。

  2. < p > 你的工作流程需要切片、操作、导出吗?

    如果是这样,你可以使用dask.dataframe来切片,执行你的计算和迭代导出。分块是由dask静默执行的,它也支持pandas API的一个子集。

  3. < p > 如果所有这些都失败了,就通过块逐行读取。

    通过熊猫或通过csv图书馆作为最后的手段。

对于大数据,我建议您使用图书馆的“dask"< br >例句:

# Dataframes implement the Pandas API
import dask.dataframe as dd
df = dd.read_csv('s3://.../2018-*-*.csv')

你可以从文档在这里中阅读更多。

另一个很好的选择是使用modin,因为所有的功能都与pandas相同,但它利用了分布式数据框架库,如dask。

从我的项目中,另一个高级库是datatable

# Datatable python library
import datatable as dt
df = dt.fread("s3://.../2018-*-*.csv")

除了上面的答案,对于那些想要处理CSV然后导出为CSV、parquet或SQL的人来说,d6tstack是另一个不错的选择。您可以加载多个文件,它处理数据模式更改(添加/删除列)。核心支持已经被剔除。

def apply(dfg):
# do stuff
return dfg


c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',', chunksize=1e6)


# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply, chunksize=1e6)


# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible

您可以将数据读入为块,并将每个块保存为pickle。

import pandas as pd
import pickle


in_path = "" #Path where the large file is
out_path = "" #Path to save the pickle files to
chunk_size = 400000 #size of chunks relies on your available memory
separator = "~"


reader = pd.read_csv(in_path,sep=separator,chunksize=chunk_size,
low_memory=False)




for i, chunk in enumerate(reader):
out_file = out_path + "/data_{}.pkl".format(i+1)
with open(out_file, "wb") as f:
pickle.dump(chunk,f,pickle.HIGHEST_PROTOCOL)

在下一步中,读入pickle并将每个pickle附加到所需的数据框架中。

import glob
pickle_path = "" #Same Path as out_path i.e. where the pickle files are


data_p_files=[]
for name in glob.glob(pickle_path + "/data_*.pkl"):
data_p_files.append(name)




df = pd.DataFrame([])
for i in range(len(data_p_files)):
df = df.append(pd.read_pickle(data_p_files[i]),ignore_index=True)

解决方案1:

使用pandas与大数据

解决方案2:

TextFileReader = pd.read_csv(path, chunksize=1000)  # the number of rows per chunk


dfList = []
for df in TextFileReader:
dfList.append(df)


df = pd.concat(dfList,sort=False)

如果有人还在寻找类似的东西,我发现这个名为modin的新库可以提供帮助。它使用分布式计算来帮助读取。这里有一个很好的文章比较它与pandas的功能。它本质上使用与熊猫相同的功能。

import modin.pandas as pd
pd.read_csv(CSV_FILE_NAME)

下面是一个例子:

chunkTemp = []
queryTemp = []
query = pd.DataFrame()


for chunk in pd.read_csv(file, header=0, chunksize=<your_chunksize>, iterator=True, low_memory=False):


#REPLACING BLANK SPACES AT COLUMNS' NAMES FOR SQL OPTIMIZATION
chunk = chunk.rename(columns = {c: c.replace(' ', '') for c in chunk.columns})


#YOU CAN EITHER:
#1)BUFFER THE CHUNKS IN ORDER TO LOAD YOUR WHOLE DATASET
chunkTemp.append(chunk)


#2)DO YOUR PROCESSING OVER A CHUNK AND STORE THE RESULT OF IT
query = chunk[chunk[<column_name>].str.startswith(<some_pattern>)]
#BUFFERING PROCESSED DATA
queryTemp.append(query)


#!  NEVER DO pd.concat OR pd.DataFrame() INSIDE A LOOP
print("Database: CONCATENATING CHUNKS INTO A SINGLE DATAFRAME")
chunk = pd.concat(chunkTemp)
print("Database: LOADED")


#CONCATENATING PROCESSED DATA
query = pd.concat(queryTemp)
print(query)

我想在已经提供的大多数潜在解决方案的基础上,给出一个更全面的答案。我还想指出另一个可能有助于阅读的方法。

选项1:dtypes

"dtypes"是一个非常强大的参数,你可以用它来减少read方法的内存压力。参见 answer。默认情况下,Pandas尝试推断数据的dtype。

根据数据结构,每存储一个数据,就进行一次内存分配。在基本层面上,请参考以下值(下表说明了C编程语言的值):

The maximum value of UNSIGNED CHAR = 255
The minimum value of SHORT INT = -32768
The maximum value of SHORT INT = 32767
The minimum value of INT = -2147483648
The maximum value of INT = 2147483647
The minimum value of CHAR = -128
The maximum value of CHAR = 127
The minimum value of LONG = -9223372036854775808
The maximum value of LONG = 9223372036854775807

参考页查看NumPy和C类型之间的匹配。

假设你有一个整数数组数字。你可以在理论上和实际中分配,比如说一个16位整数类型的数组,但是你会分配比实际需要更多的内存来存储这个数组。为了防止这种情况,你可以在read_csv上设置dtype选项。你不希望将数组项存储为长整数,实际上你可以用8位整数(np.int8np.uint8)来匹配它们。

观察下面的dtype映射。

 > .</a>
<em>来源:<a href=https://pbpython.com/pandas_dtypes.html < / em >

你可以在pandas方法上传递dtype参数作为参数,就像read上的dict一样,比如{column: type}。

import numpy as np
import pandas as pd


df_dtype = {
"column_1": int,
"column_2": str,
"column_3": np.int16,
"column_4": np.uint8,
...
"column_n": np.float32
}


df = pd.read_csv('path/to/file', dtype=df_dtype)

选项2:按块读取

以块形式读取数据允许您访问内存中的一部分数据,并且可以对数据应用预处理并保存处理过的数据而不是原始数据。如果你将这个选项与第一个选项< em > dtypes < / em >结合起来会更好。

我想指出熊猫烹饪书部分的过程,在那里你可以找到在这里。注意这两个部分;

选项3:Dask

Dask是一个框架,在Dask的网站中定义为:

Dask为分析提供了高级的并行性,为您喜爱的工具提供了大规模的性能

它的诞生是为了覆盖熊猫无法到达的必要部位。Dask是一个功能强大的框架,通过以分布式方式处理数据,允许您访问更多的数据。

你可以使用dask来预处理你的数据作为一个整体,dask负责分块部分,所以不像pandas,你可以只定义你的处理步骤,让dask来做工作。Dask在被compute和/或persist显式推入之前不应用计算(请参阅答案在这里了解差异)。

其他辅助(想法)

  • ETL流为数据设计。只保留原始数据中需要的内容。
    • 首先,使用Dask或PySpark等框架将ETL应用于整个数据,并导出处理后的数据。
    • 然后看看处理过的数据是否能整体地放入内存中。
    • 李< / ul > < / >
    • 考虑增加内存。
    • 考虑在云平台上使用这些数据。

在使用chunksize选项之前,如果你想确定你想要在@unutbu提到的分块for循环中写入的进程函数,你可以简单地使用nrows选项。

small_df = pd.read_csv(filename, nrows=100)

一旦确定流程块准备好了,就可以将其放入整个数据帧的分块for循环中。

如果你有一个包含millions数据条目的csv文件,并且你想要加载完整的数据集,你应该使用dask_cudf

import dask_cudf as dc


df = dc.read_csv("large_data.csv")