如何关闭 SQLAlchemy 会话?

按照我们在 如何关闭在 MySQL 中的 sql 炼金术连接中的注释,我正在检查 SQLAlchemy 创建到我的数据库中的连接,如果不退出 Python,我就无法关闭它们。

如果我在 python 控制台中运行这段代码,它会一直打开会话,直到我退出 python:

from sqlalchemy.orm import sessionmaker
from models import OneTable, get_engine


engine = get_engine(database="mydb")
session = sessionmaker(bind=engine)()


results = session.query(OneTable.company_name).all()


# some work with the data #


session.close()

我发现唯一能关闭它的方法是在最后调用 engine.dispose()

根据我在上面的链接中的评论,我的问题是:

  • 为什么 engine.dispose()是必要的闭幕会议?
  • session.close()还不够吗?
101462 次浏览

There's a central confusion here over the word "session". I'm not sure here, but it appears like you may be confusing the SQLAlchemy Session with a MySQL @@session, which refers to the scope of when you first make a connection to MySQL and when you disconnect.

These two concepts are not the same. A SQLAlchemy Session generally represents the scope of one or more transactions, upon a particular database connection.

Therefore, the answer to your question as literally asked, is to call session.close(), that is, "how to properly close a SQLAlchemy session".

However, the rest of your question indicates you'd like some functionality whereby when a particular Session is closed, you'd like the actual DBAPI connection to be closed as well.

What this basically means is that you wish to disable connection pooling. Which as other answers mention, easy enough, use NullPool.

session.close() will give the connection back to the connection pool of Engine and doesn't close the connection.

engine.dispose() will close all connections of the connection pool.

Engine will not use connection pool if you set poolclass=NullPool. So the connection (SQLAlchemy session) will close directly after session.close().

In LogicBank, I had a series of unittest tests. Each test copied a sqlite database prior to running, like this:

copyfile(src=nw_source, dst=nw_loc)

Each test ran individually, but failed in discover mode. It became apparent that somehow the database copy was not happening.

It appeared that perhaps unittest were not run serially. Not so - unittests do, in fact, run serially. So that was not the problem (logging that, to perhaps save somebody some time).

After a remarkable amount of thrashing, it appears that this was because the database was not completely closed from the prior test. Somehow that interfered with the copy, above. Mine is not to wonder why...

Thanks to the posts above, I resolved it like this:

def tearDown(file: str, started_at: str, engine: sqlalchemy.engine.base.Engine, session: sqlalchemy.orm.session.Session):
"""
close session & engine, banner


:param file: caller, usually __file__
:param started_at: eg, str(datetime.now())
:param engine: eg, nw.logic import session, engine
:param session: from nw.logic import session, engine
:return:
"""


session.close()
engine.dispose(). # NOTE: close required before dispose!


print("\n")
print("**********************")
print("** Test complete, SQLAlchemy session/engine closed for: " + file)
print("** Started: " + started_at + " Ended: " + str(datetime.now()))
print("**********************")