为什么迭代大型 Django 查询集会消耗大量内存?

该表包含大约1000万行。

for event in Event.objects.all():
print event

这将导致内存使用量稳步增加到4GB 左右,此时行将快速打印。第一行打印之前的长时间延迟让我感到惊讶——我原以为它几乎会立刻打印出来。

我还尝试了 Event.objects.iterator(),它的表现也是一样的。

我不明白 Django 加载到内存中的是什么,也不明白它为什么要这么做。我期望 Django 在数据库级迭代结果,这意味着结果将以大致恒定的速率打印(而不是在长时间等待后一次打印所有结果)。

我误会了什么?

(我不知道这是否相关,但我正在使用 PostgreSQL。)

60733 次浏览

这是医生说的: Http://docs.djangoproject.com/en/dev/ref/models/querysets/

在执行查询集计算之前,实际上不会发生任何数据库活动。

因此,当运行 print event时,查询将触发(这是根据您的命令进行的全表扫描)并载入结果。你要求所有的对象,没有办法得到第一个对象没有得到所有的。

但是如果你这样做:

Event.objects.all()[300:900]

Http://docs.djangoproject.com/en/dev/topics/db/queries/#limiting-querysets

然后,它将在内部向 sql 添加偏移量和限制。

内特 · C 很接近了,但还不够。

来自 那些文件:

可以通过以下方式计算 QuerySet:

  • 迭代。QuerySet 是可迭代的,并且在您第一次迭代它时执行其数据库查询。例如,这将打印数据库中所有条目的标题:

    for e in Entry.objects.all():
    print e.headline
    

So your ten million rows are retrieved, all at once, when you first enter that loop and get the iterating form of the queryset. The wait you experience is Django loading the database rows and creating objects for each one, before returning something you can actually iterate over. Then you have everything in memory, and the results come spilling out.

From my reading of the docs, iterator() does nothing more than bypass QuerySet's internal caching mechanisms. I think it might make sense for it to a do a one-by-one thing, but that would conversely require ten-million individual hits on your database. Maybe not all that desirable.

Iterating over large datasets efficiently is something we still haven't gotten quite right, but there are some snippets out there you might find useful for your purposes:

对于大量的记录,数据库光标数据库光标的表现甚至更好。在 Django 中确实需要原始 SQL,Django 游标与 SQL 游标不同。

Nate C 建议的 LIMIT-OffSET 方法可能已经足够适合您的情况了。对于大量数据,它比游标慢,因为它必须一遍又一遍地运行相同的查询,并且必须跳过越来越多的结果。

可能不是最快或者最有效的,但是作为一个现成的解决方案,为什么不使用 django core 的 Paginator 和 Page 对象呢:

Https://docs.djangoproject.com/en/dev/topics/pagination/

就像这样:

from django.core.paginator import Paginator
from djangoapp.models import model


paginator = Paginator(model.objects.all(), 1000) # chunks of 1000, you can
# change this to desired chunk size


for page in range(1, paginator.num_pages + 1):
for row in paginator.page(page).object_list:
# here you can do whatever you want with the row
print "done processing page %s" % page

Django 没有很好的从数据库获取大型项目的解决方案。

import gc
# Get the events in reverse order
eids = Event.objects.order_by("-id").values_list("id", flat=True)


for index, eid in enumerate(eids):
event = Event.object.get(id=eid)
# do necessary work with event
if index % 100 == 0:
gc.collect()
print("completed 100 items")

Value _ list 可用于获取数据库中的所有 id,然后分别获取每个对象。一段时间后,大型对象将在内存中创建,并且在 for 循环退出之前不会被垃圾收集。以上代码在每消耗100个项目后进行手动垃圾收集。

在迭代查询集之前,会消耗大量的内存,因为整个查询的所有数据库行都会一次性处理成对象,而且根据行数的不同,可能会进行大量的处理。

您可以将查询集分块成更小的可消化位。我把这种模式称为“勺喂”。下面是我在管理命令中使用的带有进度条的实现,首先是 pip3 install tqdm

from tqdm import tqdm




def spoonfeed(qs, func, chunk=1000, start=0):
"""
Chunk up a large queryset and run func on each item.


Works with automatic primary key fields.


chunk -- how many objects to take on at once
start -- PK to start from


>>> spoonfeed(Spam.objects.all(), nom_nom)
"""
end = qs.order_by('pk').last()
progressbar = tqdm(total=qs.count())
if not end:
return
while start < end.pk:
for o in qs.filter(pk__gt=start, pk__lte=start+chunk):
func(o)
progressbar.update(1)
start += chunk
progressbar.close()

为了使用它,你需要编写一个函数来对你的对象进行操作:

def set_population(town):
town.population = calculate_population(...)
town.save()

然后在查询集上运行该函数:

spoonfeed(Town.objects.all(), set_population)

Django 的默认行为是在计算查询时缓存 QuerySet 的整个结果。您可以使用 QuerySet 的迭代器方法来避免这种缓存:

for event in Event.objects.all().iterator():
print event

Https://docs.djangoproject.com/en/stable/ref/models/querysets/#iterator

Iterator ()方法计算查询集,然后直接读取结果,而不在 QuerySet 级别执行缓存。这种方法在迭代大量只需要访问一次的对象时,可以获得更好的性能和显著的内存减少。请注意,缓存仍然是在数据库级别完成的。

对我来说,使用 iterator ()可以减少内存使用,但是它仍然比我预期的要高。使用 mpaf 建议的分页器方法使用的内存要少得多,但是对于我的测试用例来说要慢2-3倍。

from django.core.paginator import Paginator


def chunked_iterator(queryset, chunk_size=10000):
paginator = Paginator(queryset, chunk_size)
for page in range(1, paginator.num_pages + 1):
for obj in paginator.page(page).object_list:
yield obj


for event in chunked_iterator(Event.objects.all()):
print event

这里有一个包括 len 和 count 的解决方案:

class GeneratorWithLen(object):
"""
Generator that includes len and count for given queryset
"""
def __init__(self, generator, length):
self.generator = generator
self.length = length


def __len__(self):
return self.length


def __iter__(self):
return self.generator


def __getitem__(self, item):
return self.generator.__getitem__(item)


def next(self):
return next(self.generator)


def count(self):
return self.__len__()


def batch(queryset, batch_size=1024):
"""
returns a generator that does not cache results on the QuerySet
Aimed to use with expected HUGE/ENORMOUS data sets, no caching, no memory used more than batch_size


:param batch_size: Size for the maximum chunk of data in memory
:return: generator
"""
total = queryset.count()


def batch_qs(_qs, _batch_size=batch_size):
"""
Returns a (start, end, total, queryset) tuple for each batch in the given
queryset.
"""
for start in range(0, total, _batch_size):
end = min(start + _batch_size, total)
yield (start, end, total, _qs[start:end])


def generate_items():
queryset.order_by()  # Clearing... ordering by id if PK autoincremental
for start, end, total, qs in batch_qs(queryset):
for item in qs:
yield item


return GeneratorWithLen(generate_items(), total)

用法:

events = batch(Event.objects.all())
len(events) == events.count()
for event in events:
# Do something with the Event

对于这种任务,我通常使用原始的 MySQL 原始查询而不是 Django ORM。

MySQL 支持流模式,因此我们可以安全、快速地遍历所有记录,而不会出现内存不足的错误。

import MySQLdb
db_config = {}  # config your db here
connection = MySQLdb.connect(
host=db_config['HOST'], user=db_config['USER'],
port=int(db_config['PORT']), passwd=db_config['PASSWORD'], db=db_config['NAME'])
cursor = MySQLdb.cursors.SSCursor(connection)  # SSCursor for streaming mode
cursor.execute("SELECT * FROM event")
while True:
record = cursor.fetchone()
if record is None:
break
# Do something with record here


cursor.close()
connection.close()

参考:

  1. 从 MySQL 中检索数百万行
  2. MySQL 结果集流是如何执行的,而不是一次获取整个 JDBC ResultSet

这里有很多过时的结果。不确定是什么时候添加的,但是 Django 的 QuerySet.iterator()方法 使用具有块大小的服务器端游标来流化数据库的结果。因此,如果您使用 postgres,现在应该为您处理的框。