使用SQLAlchemy ORM进行批量插入

有没有办法让SQLAlchemy执行批量插入,而不是插入每个单独的对象?即,

正在执行:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

而非:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

我刚刚将一些代码转换为使用SQLAlchemy而不是原始SQL,虽然现在使用起来要好得多,但现在似乎变慢了(高达10倍),我想知道这是否是原因。

也许我可以更有效地使用会话来改善这种情况。目前,我已经__了ABC__0,并在添加了一些内容后,对ABC__1进行了__。虽然这似乎会导致数据变得陈旧,如果数据库在其他地方发生变化,就像即使我做了一个新的查询,我仍然会得到旧的结果。

谢谢你的帮助!

234196 次浏览

据我所知,没有办法让ORM发出批量插入。我认为根本原因是SQLAlchemy需要跟踪每个对象的标识(即新的主键),而批量插入会干扰这一点。例如,假定您的foo表包含id列,并映射到Foo类:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

由于SQLAlchemy在没有发出另一个查询的情况下获得了x.id的值,因此我们可以推断它直接从INSERT语句中获得了该值。如果您不需要通过相同的实例对创建的对象进行后续访问,则可以跳过插入的ORM层:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy无法将这些新行与任何现有对象进行匹配,因此您必须重新查询它们以进行任何后续操作。

就陈旧数据而言,记住会话没有内置的方法来了解数据库何时在会话之外发生更改是很有帮助的。为了通过现有实例访问外部修改的数据,这些实例必须标记为过期了。默认情况下,这发生在session.commit()上,但也可以通过调用session.expire_all()session.expire(instance)来手动完成。示例(SQL省略):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()在ABC__1__到期,因此第一个打印语句隐式打开一个新事务并重新查询x的属性。如果注释掉第一个print语句,您将注意到第二个语句现在获得了正确的值,因为新查询直到更新后才发出。

从事务隔离的角度来看,这是有意义的-您应该只获取事务之间的外部修改。如果这给您带来了麻烦,我建议您澄清或重新考虑您的应用程序的事务边界,而不是立即使用session.expire_all()

从版本0.8开始,SQLAlchemy中添加了直接支持

根据文件connection.execute(table.insert().values(data))应该可以。(注意,这是,与connection.execute(table.insert(), data)相同,它通过调用executemany来导致许多单独的行插入)。在本地连接以外的任何情况下,性能差异都可能是巨大的。

这是一种方法:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

这将像这样插入:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

参考:SQLAlchemy常见问题包括各种提交方法的基准。

SQLAlchemy介绍了在版本1.0.0

批量操作-SQLAlchemy文档

通过这些操作,您现在可以执行批量插入或更新。

例如,您可以:

s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

在这里,将进行批量插入。

SQLAlchemy文档对可用于批量插入的各种技术的性能进行了写下来

ORM基本上不适用于高性能批量插入- 这就是SQLAlchemy除了提供 ORM作为一级组件。

对于快速批量插入的用例,SQL生成和 ORM在其上构建的执行系统是核心的一部分。 直接使用这个系统,我们可以生产一个插件, 与直接使用原始数据库API的竞争。

或者,SQLAlchemy ORM提供批量操作套件 方法,这些方法提供了到工作单元的子部分的挂钩。 处理,以便发出核心级插入和更新构造。 基于ORM的自动化程度较低。

下面

的示例说明了几种不同的基于时间的测试 插入行的方法,从自动化程度最高到最低。 在CPython 2.7中,运行时观察到:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

脚本:

import time
import sqlite3


from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker


Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None




class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))




def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
global engine
engine = create_engine(dbname, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)




def test_sqlalchemy_orm(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer()
customer.name = 'NAME ' + str(i)
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")




def test_sqlalchemy_orm_pk_given(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer(id=i+1, name="NAME " + str(i))
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM pk given: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")




def test_sqlalchemy_orm_bulk_insert(n=100000):
init_sqlalchemy()
t0 = time.time()
n1 = n
while n1 > 0:
n1 = n1 - 10000
DBSession.bulk_insert_mappings(
Customer,
[
dict(name="NAME " + str(i))
for i in xrange(min(10000, n1))
]
)
DBSession.commit()
print(
"SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")




def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
print(
"SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")




def init_sqlite3(dbname):
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute("DROP TABLE IF EXISTS customer")
c.execute(
"CREATE TABLE customer (id INTEGER NOT NULL, "
"name VARCHAR(255), PRIMARY KEY(id))")
conn.commit()
return conn




def test_sqlite3(n=100000, dbname='sqlite3.db'):
conn = init_sqlite3(dbname)
c = conn.cursor()
t0 = time.time()
for i in xrange(n):
row = ('NAME ' + str(i),)
c.execute("INSERT INTO customer (name) VALUES (?)", row)
conn.commit()
print(
"sqlite3: Total time for " + str(n) +
" records " + str(time.time() - t0) + " sec")


if __name__ == '__main__':
test_sqlalchemy_orm(100000)
test_sqlalchemy_orm_pk_given(100000)
test_sqlalchemy_orm_bulk_insert(100000)
test_sqlalchemy_core(100000)
test_sqlite3(100000)

SQLAlchemy介绍了在版本1.0.0

批量操作-SQLAlchemy文档

通过这些操作,您现在可以执行批量插入或更新。

例如(如果您希望简单表插入的开销最低),您可以使用Session.bulk_insert_mappings()

loadme = [(1, 'a'),
(2, 'b'),
(3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]


s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

或者,如果您愿意,可以跳过loadme元组,并将字典直接写入dicts(但我发现将所有冗长的数据排除在外并在循环中加载字典列表更容易)。

Piere的回答是正确的,但是有一个问题是,bulk_save_objects在默认情况下不返回对象的主键,如果您担心这个问题的话。将return_defaults设置为True以获得此行为。

文档在这里

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
assert foo.id is not None
session.commit()

我通常使用add_all

from app import session
from models import User


objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()

到目前为止,我在SQLAlchemy文档中找到了最好的答案:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow.

有一个可能解决方案的基准测试的完整示例。

如文档中所示:

批量_保存_对象不是最好的解决方案,但它的性能是正确的。

就可读性而言,我认为第二好的实现是SQLAlchemy核心:

def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)

文档文章中给出了此函数的上下文。

条条大路通罗马,但其中一些需要翻山越岭,需要渡船,但如果你想快速到达那里,只需走高速公路。


在这种情况下,高速公路将使用PSYCOPG2执行_批处理()功能。文档说它是最好的:

executemany()的当前实施(使用极其宽容的轻描淡写的说法)表现不佳。这些函数可用于加速对一组参数重复执行语句。通过减少服务器往返次数,性能可以比使用executemany()提高几个数量级。

在我自己的测试中,execute_batch()将ABC__2__为executemany(),并提供了配置页面_大小的选项,以便进一步调整(如果您希望从驱动程序中挤出最后2-3%的性能)。

如果您正在使用SQLAlchemy,则可以轻松启用相同的功能,方法是在使用create_engine()实例化引擎时,将use_batch_mode=True设置为参数

SQLAlchemy支持批量插入

bulk_list = [
Foo(
bar=1,
),
Foo(
bar=2,
),
Foo(
bar=3,
),
]
db.session.bulk_save_objects(bulk_list)
db.session.commit()