存储效率高的内置 SqlAlchemy 迭代器/生成器?

我有一个 ~ 10M 的记录 MySQL 表,我使用 SqlAlchemy 与之交互。我发现对这个表的大的子集的查询会消耗太多的内存,即使我认为我使用的是一个内置的生成器,智能地获取数据集的一小块:

for thing in session.query(Things):
analyze(thing)

为了避免这种情况,我发现我必须构建自己的迭代器,这个迭代器是以块为单位的:

lastThingID = None
while True:
things = query.filter(Thing.id < lastThingID).limit(querySize).all()
if not rows or len(rows) == 0:
break
for thing in things:
lastThingID = row.id
analyze(thing)

这正常吗,还是我忽略了什么关于 SA 内置发电机的事?

这个问题的答案似乎表明内存消耗是不可预期的。

70287 次浏览

AFAIK, the first variant still gets all the tuples from the table (with one SQL query) but builds the ORM presentation for each entity when iterating. So it is more efficient than building a list of all entities before iterating but you still have to fetch all the (raw) data into memory.

Thus, using LIMIT on huge tables sounds like a good idea to me.

Most DBAPI implementations fully buffer rows as they are fetched - so usually, before the SQLAlchemy ORM even gets a hold of one result, the whole result set is in memory.

But then, the way Query works is that it fully loads the given result set by default before returning to you your objects. The rationale here regards queries that are more than simple SELECT statements. For example, in joins to other tables that may return the same object identity multiple times in one result set (common with eager loading), the full set of rows needs to be in memory so that the correct results can be returned otherwise collections and such might be only partially populated.

So Query offers an option to change this behavior through yield_per(). This call will cause the Query to yield rows in batches, where you give it the batch size. As the docs state, this is only appropriate if you aren't doing any kind of eager loading of collections so it's basically if you really know what you're doing. Also, if the underlying DBAPI pre-buffers rows, there will still be that memory overhead so the approach only scales slightly better than not using it.

I hardly ever use yield_per(); instead, I use a better version of the LIMIT approach you suggest above using window functions. LIMIT and OFFSET have a huge problem that very large OFFSET values cause the query to get slower and slower, as an OFFSET of N causes it to page through N rows - it's like doing the same query fifty times instead of one, each time reading a larger and larger number of rows. With a window-function approach, I pre-fetch a set of "window" values that refer to chunks of the table I want to select. I then emit individual SELECT statements that each pull from one of those windows at a time.

The window function approach is on the wiki and I use it with great success.

Also note: not all databases support window functions; you need Postgresql, Oracle, or SQL Server. IMHO using at least Postgresql is definitely worth it - if you're using a relational database, you might as well use the best.

I've been looking into efficient traversal/paging with SQLAlchemy and would like to update this answer.

I think you can use the slice call to properly limit the scope of a query and you could efficiently reuse it.

Example:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
start,stop = window_size*window_idx, window_size*(window_idx+1)
things = query.slice(start, stop).all()
if things is None:
break
for thing in things:
analyze(thing)
if len(things) < window_size:
break
window_idx += 1

In the spirit of Joel's answer, I use the following:

WINDOW_SIZE = 1000
def qgen(query):
start = 0
while True:
stop = start + WINDOW_SIZE
things = query.slice(start, stop).all()
if len(things) == 0:
break
for thing in things:
yield thing
start += WINDOW_SIZE

Using LIMIT/OFFSET is bad, because you need to find all {OFFSET} columns before, so the larger is OFFSET - the longer request you get. Using windowed query for me also gives bad results on large table with large amount of data (you wait first results for too long, that it's not good in my case for chunked web response).

Best approach given here https://stackoverflow.com/a/27169302/450103. In my case I resolved problem simply using index on datetime field and fetching next query with datetime>=previous_datetime. Stupid, because I used that index in different cases before, but thought that for fetching all data windowed query would be better. In my case I was wrong.

I am not a database expert, but when using SQLAlchemy as a simple Python abstraction layer (ie, not using the ORM Query object) I've came up with a satisfying solution to query a 300M-row table without exploding memory usage...

Here is a dummy example:

from sqlalchemy import create_engine, select


conn = create_engine("DB URL...").connect()
q = select([huge_table])


proxy = conn.execution_options(stream_results=True).execute(q)

Then, I use the SQLAlchemy fetchmany() method to iterate over the results in a infinite while loop:

while 'batch not empty':  # equivalent of 'while True', but clearer
batch = proxy.fetchmany(100000)  # 100,000 rows at a time


if not batch:
break


for row in batch:
# Do your stuff here...


proxy.close()

This method allowed me to do all kind of data aggregation without any dangerous memory overhead.

NOTE the ABC1 works with Postgres and the pyscopg2 adapter, but I guess it won't work with any DBAPI, nor with any database driver...

There is an interesting usecase in this blog post that inspired my above method.

If you're working with Postgres or an RDBMS that supports cursors, it is very simple to iterate efficiently over a lot of rows:

 with db_session() as session:
for partition in session.stream(select(Thing.id)).partitions(1000):
for item in partition:
analyze(item)

This creates a forward cursor that fetches the result in batches of 1000 rows, which results in minimal memory usage on the server and on the client.