使用 pyODBC 的 fast_Executemany 加速 Pandas.DataFrame.to_sql

我想发送一个大的 pandas.DataFrame到一个远程服务器运行的 MS SQL。现在的方法是将一个 data_frame对象转换为一个元组列表,然后使用 pyODBC 的 executemany()函数将其发送出去。大概是这样的:

 import pyodbc as pdb


list_of_tuples = convert_df(data_frame)


connection = pdb.connect(cnxn_str)


cursor = connection.cursor()
cursor.fast_executemany = True
cursor.executemany(sql_statement, list_of_tuples)
connection.commit()


cursor.close()
connection.close()

然后我开始怀疑是否可以通过使用 data_frame.to_sql()方法来加速(或者至少更具可读性)。我想出了以下的解决办法:

 import sqlalchemy as sa


engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
data_frame.to_sql(table_name, engine, index=False)

现在代码更易读,但上传是 至少慢150倍..。

在使用 SQLAlchemy 时有没有翻转 fast_executemany的方法?

我正在使用熊猫 -0.20.3、 pyODBC-4.0.21和 sql 炼金术 -1.1.13。

105393 次浏览

在联系了 SQLAlchemy 的开发人员之后,出现了一种解决这个问题的方法。非常感谢他们的伟大工作!

必须使用游标执行事件并检查是否引发了 executemany标志。如果确实如此,打开 fast_executemany选项。例如:

from sqlalchemy import event


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True

可以找到关于执行事件的更多信息 给你


更新: SQLAlchemy 1.3.0中增加了对 pyodbcfast_executemany的支持,所以这种黑客攻击不再需要了。

编辑(2019-03-08) : 戈德 · 汤普森(Gord Thompson)在下面的评论中带来了来自 sql 炼金术更新日志的好消息: 自从2019-03-04发布 SQLAlchemy 1.3.0以来,sqlchemy 现在支持 ABC0用于 ABC1方言。也就是说,不再需要定义函数并使用 @event.listens_for(engine, 'before_cursor_execute')意味着下面的函数可以删除,只有标志需要在 create _ engine 语句中设置——并且仍然保持加速。

原文:

刚开了个账户发布这个。我想在上面的帖子下面评论一下,因为这是对已经提供的答案的跟进。上面的解决方案对我来说是可行的,它是基于基于 Ubuntu 安装的 Microsoft SQL 存储器上的17版 SQL 驱动程序。

下面是我用来显著提高速度的完整代码(大于100倍的提速)。这是一个交钥匙代码片段,你可以用你的相关细节改变连接字符串。对于上面的海报,非常感谢你的解决方案,因为我已经找了相当长的时间这一点。

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus




conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)




@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True




table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))




s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

基于下面的评论,我想花一些时间来解释一些关于熊猫 to_sql实现和查询处理方式的限制。有两个因素可能导致 MemoryError发生异常:

1)假设您正在写入远程 SQL 存储。当您尝试用 to_sql方法编写一个大熊猫数据框架时,它会将整个数据框架转换为一个值列表。这种转换比原始 DataFrame 占用更多的 RAM (除此之外,旧的 DataFrame 仍然存在于 RAM 中)。此列表提供给 ODBC 连接器的最终 executemany调用。我认为 ODBC 连接器在处理如此大的查询时存在一些问题。解决这个问题的一种方法是为 to_sql方法提供一个块大小的参数(10 * * 5似乎是最优的,大约为600 mbit/s (!)2 CPU 7 GB 内存的 MSSQL 存储应用程序的写速度-不推荐 Azure)。因此,第一个限制,即查询大小,可以通过提供 chunksize参数来规避。但是,这不允许您编写大小为10 * * 7或更大的数据框架(至少不能在我使用的有大约55GB RAM 的虚拟机上编写) ,因为它的问题是 nr 2。

这可以通过使用 np.split(大小为10 * * 6的 DataFrame 块)分解 DataFrame 来避免。我会尝试做一个拉请求,当我有一个解决方案准备的 to_sql方法在熊猫本身的核心,所以你不必这样做,预分手每次。不管怎样,我最终编写了一个类似于(不是交钥匙)下面的函数:

import pandas as pd
import numpy as np


def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True

可以在这里查看上述代码片段的更完整的示例: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

这是我编写的一个类,它合并了补丁程序,减少了设置 SQL 连接所带来的一些必要开销。还要写一些文档。此外,我还计划把这个补丁贡献给熊猫本身,但还没有找到一个很好的方法来做到这一点。

希望这个能帮上忙。

我只是想把这个完整的示例作为一个附加的高性能选项发布给那些可以使用新的 turbodbc 库的人: http://turbodbc.readthedocs.io/en/latest/

很明显,大熊猫之间有很多不同的选择。To _ sql () ,通过 sql 炼金术触发 fast _ Executemany,直接使用 pyodbc 和 tuple/list/等,甚至使用平面文件尝试 BULK UPLOAD。

希望下面的内容能够使您的生活更加愉快,因为目前的熊猫项目中的功能正在发展,或者将来会包含一些诸如 turbodbc 集成之类的内容。

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO


test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''


df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])


options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)


test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]


CREATE TABLE [db_name].[schema].[test]
(
id int NULL,
transaction_dt datetime NULL,
units int NULL,
measures float NULL
)


INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
VALUES (?,?,?,?) '''


cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

Turbodbc 在许多用例中应该非常快(特别是对于 numpy 数组)。请注意,将底层 numpy 数组作为参数从数据框列直接传递给查询是多么简单。我还相信这有助于防止创建过度刺激内存消耗的中间对象。希望这对你有帮助!

看来熊猫0.23.0和0.24.0 使用多值插入与 PyODBC,这阻止快速执行帮助-一个单一的 INSERT ... VALUES ...语句是每块发出。多值插入块比旧的缓慢执行默认方法有所改进,但至少在简单测试中,快速执行默认方法仍然占上风,更不用说不需要手动计算 chunksize,因为多值插入需要这样做。如果将来没有提供配置选项,可以通过 monkeypatching 来强制执行旧的行为:

import pandas.io.sql


def insert_statement(self, data, conn):
return self.table.insert(), data


pandas.io.sql.SQLTable.insert_statement = insert_statement

未来就在这里,至少在 master分支中,可以使用 to_sql()的关键字参数 method=来控制 insert 方法。它默认为 None,它强制使用 Executemany 方法。传递 method='multi'将导致使用多值插入。它甚至可以用于实现特定于 DBMS 的方法,例如 PostgreqlCOPY

正如@Pylander 所指出的

到目前为止,Turbodbc 是数据摄取的最佳选择!

我太兴奋了,在我的 github 和 media 上写了一篇博客: 请检查“ href = “ https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e”rel = “ norefrer”> https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

获取一个工作示例,并与 Pandas.to _ sql 进行比较

长话短说,

Turbodbc 我在3秒内写了10000行(77列)

使用 Pandas.to _ sql 我用198秒写了同样的10000行(77列) ..。

这就是我所做的全部细节

进口:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time

加载并处理一些数据-用 sample.pkl 代替您的:

df = pd.read_pickle('sample.pkl')


df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...

使用 sqlAlchemy 创建表

不幸的是,turbodbc 需要大量的开销和大量的 sql 人工操作,以创建表并在其上插入数据。

幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。

第一步是创建接收数据的表。但是,如果表中有多个列,则手动创建表并编写 sql 代码可能会出现问题。在我的例子中,表通常有240列!

这就是 sqlAlchemy 和 Pandas 仍然可以帮助我们的地方: Pandas 不适合编写大量行(本例中为10000行) ,但是如果只编写表头的6行呢?这样,我们就可以自动化创建表的过程。

创建 sqlAlchemy 连接:

mydb = 'someDB'


def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
myuser, mypassword,
myhost, db
)
)
return database_connection


pd_connection = make_con(mydb)

在 SQLServer 上创建表

使用熊猫 + sqlAlchemy,但只是为了为之前提到的 turbodbc 准备空间。请注意这里的 df.head () : 我们使用 Pandas + sqlAlchemy 仅插入6行数据。这将运行得非常快,并且正在自动创建表。

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)

既然桌子已经摆好了,我们就认真点吧。

Turbodbc 连接:

def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver='ODBC Driver 17 for SQL Server',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection

为 turbodbc 准备 sql 命令和数据:

def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
# preparing columns
colunas = '('
colunas += ', '.join(df.columns)
colunas += ')'


# preparing value place holders
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'


# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""


# writing array of values for turbodbc
valores_df = [df[col].values for col in df.columns]


# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()


# inserts data, for real
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print('something went wrong')


stop = time.time() - start
return print(f'finished in {stop} seconds')

使用 turbodbc 写数据——我在3秒钟内写了10000行(77列) :

turbo_write(mydb, df.sample(10000), table)

熊猫方法比较-我得到了相同的10000行(77列)在198秒..。

table = 'pd_testing'


def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f'finished in {stop} seconds')


pandas_comparisson(df.sample(10000), table)

环境及条件

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges

请检查 https://erickfis.github.io/loose-code/在此代码中的更新!

SQLServerINSERT 性能: pyodbc 与 turbodbc

当使用 to_sql将熊猫数据框架上传到 SQLServer 时,turbodbc 肯定比没有 fast_executemany的 pyodbc 快。然而,当对 pyodbc 启用 fast_executemany时,两种方法的性能基本相同。

测试环境:

[ venv1 _ pyodbc ]
2.0.25

[ venv2 _ turbodbc ]
Turbodbc3.0.0
Sql 炼金术 -turbodbc 0.1.0

[两者都有]
Python 3.6.4在 Windows 上的64位
SQLAlchemy 1.3.0 b1
熊猫0.23.4
1.15.4

测试代码:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')


# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
[[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
columns=[f'col{y:03}' for y in range(num_cols)]
)


t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

对每个环境运行12(12)次测试,放弃每个环境的单个最佳和最差时间。结果(以秒为单位) :

   rank  pyodbc  turbodbc
----  ------  --------
1    22.8      27.5
2    23.4      28.1
3    24.6      28.2
4    25.2      28.5
5    25.7      29.3
6    26.9      29.9
7    27.0      31.4
8    30.1      32.1
9    33.6      32.5
10    39.8      32.9
----  ------  --------
average    27.9      30.0

我在使用 PostgreSQL 时遇到了同样的问题。他们现在只是发布 熊猫版本0.24.0和有一个新的参数在 to_sql函数称为 method,这解决了我的问题。

from sqlalchemy import create_engine


engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

上传速度是我的100倍。 如果要发送大量数据,我还建议设置 chunksize参数。

我只是想补充一下@J. K. 的回答。

如果你正在使用这种方法:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True

你会得到这个错误:

错误: (pyodbc. Error)(‘ HY010’,’[ HY010] [ Microsoft ][ SQL Server 本机客户端11.0]函数序列错误(0) (SQLParamData)’)[ SQL: ‘ INSERT INTO... (...) VALUES (? ,?)’] 【参数: (... ,...) ,(... ,...)】(关于这个错误的背景: Http://sqlalche.me/e/dbapi

对字符串值进行如下编码: 'yourStringValue'.encode('ascii')

这能解决你的问题。

我只是修改引擎线,帮助我加快插入100倍。

旧法典

import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine


retry_count = 0
retry_flag = True


hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")


engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")


while retry_flag and retry_count < 5:
try:
df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
retry_flag = False
except:
retry_count = retry_count + 1
time.sleep(30)

改良引擎线

从..

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

去..

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)

问我任何查询相关的 Python 到 SQL 的连接,我会很乐意帮助你。