如何从一个 sql 查询创建一个大熊猫数据框而不用耗尽内存?

从 MS SQL Server 数据库中查询超过500万条记录的表存在问题。我想选择所有的记录,但我的代码似乎失败时,选择到内存中的数据太多。

这种方法是有效的:

import pandas.io.sql as psql
sql = "SELECT TOP 1000000 * FROM MyTable"
data = psql.read_frame(sql, cnxn)

但是这个不管用:

sql = "SELECT TOP 2000000 * FROM MyTable"
data = psql.read_frame(sql, cnxn)

它返回这个错误:

File "inference.pyx", line 931, in pandas.lib.to_object_array_tuples
(pandas\lib.c:42733) Memory Error

我读过 给你,在从 csv 文件创建 dataframe时也存在类似的问题,解决方法是使用‘ iterator’和‘ chunksize’参数,如下所示:

read_csv('exp4326.csv', iterator=True, chunksize=1000)

对于从 SQL 数据库进行查询,是否有类似的解决方案?如果没有,首选的解决办法是什么?我是否应该使用其他方法来读取成块的记录?我读过一些关于在熊猫中使用大型数据集的 给你讨论,但是执行 SELECT * 查询似乎需要做很多工作。肯定有更简单的方法。

112450 次浏览

更新: 请务必查看下面的答案,因为熊猫现在已经内置了对分块装载的支持。

您可以简单地尝试以块的方式读取输入表,然后根据各个部分组装完整的数据框,如下所示:

import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset)
dfs.append(psql.read_frame(sql, cnxn))
offset += chunk_size
if len(dfs[-1]) < chunk_size:
break
full_df = pd.concat(dfs)

也有可能是整个数据框太大,无法放入内存中,在这种情况下,除了限制所选行或列的数量之外,您别无选择。

正如在评论中提到的,从熊猫0.15开始,您在 read_sql中有一个块大小选项,可以逐块读取和处理查询块:

sql = "SELECT * FROM My_Table"
for chunk in pd.read_sql_query(sql , engine, chunksize=5):
print(chunk)

参考资料: http://pandas.pydata.org/pandas-docs/version/0.15.2/io.html#querying

代码解决方案和备注。

# Create empty list
dfl = []


# Create empty dataframe
dfs = pd.DataFrame()


# Start Chunking
for chunk in pd.read_sql(query, con=conct, ,chunksize=10000000):


# Start Appending Data Chunks from SQL Result set into List
dfl.append(chunk)


# Start appending data from list to dataframe
dfs = pd.concat(dfl, ignore_index=True)

然而,我的内存分析告诉我,即使内存被释放后,每个块提取,列表越来越大,占用的内存导致净没有增益的空闲内存。

很想听听作者/其他人怎么说。

如果要限制输出中的行数,只需使用:

data = psql.read_frame(sql, cnxn,chunksize=1000000).__next__()

我发现的处理这个问题的最佳方法是利用 SQLAlchemy  汽 _ 結果連接選項

conn = engine.connect().execution_options(stream_results=True)

并将 con 对象传递给

pd.read_sql("SELECT *...", conn, chunksize=10000)

这将确保在服务器端而不是客户端处理游标

你可以使用 服务器端游标(又名流结果)

import pandas as pd
from sqlalchemy import create_engine


def process_sql_using_pandas():
engine = create_engine(
"postgresql://postgres:pass@localhost/example"
)
conn = engine.connect().execution_options(
stream_results=True)


for chunk_dataframe in pd.read_sql(
"SELECT * FROM users", conn, chunksize=1000):
print(f"Got dataframe w/{len(chunk_dataframe)} rows")
# ... do something with dataframe ...


if __name__ == '__main__':
process_sql_using_pandas()

正如其他人在评论中提到的,在 pd.read_sql("SELECT * FROM users", engine, chunksize=1000)中使用 chunksize参数并不能解决问题,因为它仍然加载内存中的整个数据,然后一块一块地给您。

更多解释 给你

Chunksize 仍然加载内存中的所有数据,stream _ results = True 是答案。它是服务器端游标,加载给定块中的行并节省内存。.在许多管道中有效地使用,在加载历史数据时也可能有所帮助

stream_conn = engine.connect().execution_options(stream_results=True)

使用 pd.read _ sql 和 chechunksize

pd.read_sql("SELECT * FROM SOURCE", stream_conn , chunksize=5000)

你可以更新版本气流。 例如,我在2.2.3版本中使用 docker-compose 时出现了这个错误。

  • 执行器

Mysq 6.7

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

返回文章页面

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M"

Airflow-网上服务器:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

气流调节器:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

气流工人:

#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"

Error: 返回代码 Negsignal.SIGKILL 退出任务

但更新版本 来自 apache/airflow: 2.3.4。

使用码头中配置的相同资源完成拉动,没有任何问题

在此输入图像描述

我的数据提取器:

功能

Def getDataForSchema (表、连接、 tmp _ path、 * * kwargs) :

conn=connect_sql_server(conecction)


query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn)


##sacar  esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE  FROM information_schema.columns \
where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")
metadata = pd.read_sql_query(metadataquery, conn)
schema=generate_schema(metadata)


#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")


#consulta la tabla a extraer
query=f" SELECT  {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)


count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:
print(f"bloque  {iteraccion} de  total {count_rows} de un total {real_count_rows.iat[0, 0]}")
#logging.info(df_row.to_markdown())
if iteraccion == 0:
parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
pqwriter = pq.ParquetWriter(parquetName,schema)
tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
#logging.info(f" tabledata {tableData.column(17)}")
pqwriter.write_table(tableData)
#logging.info(f"parquet name:::{parquetName}")
##pasar a parquet df directo
#df_row.to_parquet(parquetName)
iteraccion=iteraccion+1
count_rows += len(df_row)
del df_row
del tableData
if pqwriter:
print("Cerrando archivo parquet")
pqwriter.close()
del data
del chunksize
del iteraccion