JPA: what is the proper pattern for iterating over large result sets?

假设我有一个有数百万行的表。使用 JPA,对该表进行查询迭代的正确方法是什么,以便使 I don't have all an in-memory List包含数百万个对象?

例如,我怀疑如果桌子太大,下面的内容就会爆炸:

List<Model> models = entityManager().createQuery("from Model m", Model.class).getResultList();


for (Model model : models)
{
System.out.println(model.getId());
}

分页(循环和手动更新 setFirstResult()/setMaxResult())真的是最好的解决方案吗?

Edit : 我所针对的主要用例是一种批处理作业。如果要跑很长时间也没关系。没有 Web 客户端参与; 我只需要为每一行“做一些事情”,一次一个(或一些小 N)。我只是不想让它们同时出现在记忆里。

109885 次浏览

我自己也想知道,这似乎很重要:

  • how big your dataset is (rows)
  • 您正在使用的 JPA 实现
  • 对每一行进行什么样的处理。

我已经编写了一个迭代器,以便于交换两种方法(findAll vs findEntry)。

我建议你两个都试试。

Long count = entityManager().createQuery("select count(o) from Model o", Long.class).getSingleResult();
ChunkIterator<Model> it1 = new ChunkIterator<Model>(count, 2) {


@Override
public Iterator<Model> getChunk(long index, long chunkSize) {
//Do your setFirst and setMax here and return an iterator.
}


};


Iterator<Model> it2 = List<Model> models = entityManager().createQuery("from Model m", Model.class).getResultList().iterator();




public static abstract class ChunkIterator<T>
extends AbstractIterator<T> implements Iterable<T>{
private Iterator<T> chunk;
private Long count;
private long index = 0;
private long chunkSize = 100;


public ChunkIterator(Long count, long chunkSize) {
super();
this.count = count;
this.chunkSize = chunkSize;
}


public abstract Iterator<T> getChunk(long index, long chunkSize);


@Override
public Iterator<T> iterator() {
return this;
}


@Override
protected T computeNext() {
if (count == 0) return endOfData();
if (chunk != null && chunk.hasNext() == false && index >= count)
return endOfData();
if (chunk == null || chunk.hasNext() == false) {
chunk = getChunk(index, chunkSize);
index += chunkSize;
}
if (chunk == null || chunk.hasNext() == false)
return endOfData();
return chunk.next();
}


}

我最终没有使用我的块迭代器(因此它可能没有经过测试)。顺便说一下,你将需要谷歌收集,如果你想使用它。

It depends upon the kind of operation you have to do. Why are you looping over a million of row? Are you updating something in batch mode? Are you going to display all records to a client? Are you computing some statistics upon the retrieved entities?

如果您要向客户端显示一百万条记录,请重新考虑您的用户界面。在这种情况下,适当的解决方案是对结果进行分页并使用 setFirstResult()setMaxResult()

如果您已经启动了大量记录的更新,那么您最好保持更新的简单性并使用 Query.executeUpdate()。或者,您可以使用工作管理器上的消息驱动 Bean 以异步模式执行更新。

如果在检索到的实体上计算一些统计信息,则可以利用 JPA 规范定义的分组函数。

For any other case, please be more specific :)

老实说,我建议离开 JPA,坚持使用 JDBC (但当然要使用 JdbcTemplate支持类或类似的东西)。JPA (以及其他 ORM 提供者/规范)的设计不是为了在一个事务中操作多个对象,因为它们假定所有加载的内容都应该保留在第一级缓存中(因此需要在 JPA 中使用 clear())。

此外,我推荐更多的低级解决方案,因为 ORM 的开销(反射只是冰山一角)可能是如此重要,以至于在纯 ResultSet上迭代,即使使用一些轻量级支持,如上述 JdbcTemplate,也会快得多。

JPA is simply not designed to perform operations on a large amount of entities. You might play with flush()/clear() to avoid OutOfMemoryError, but consider this once again. You gain very little paying the price of huge resource consumption.

使用 Pagination概念检索结果

没有“正确”的做法,这不是 JPA 或 JDO 或任何其他 ORM 打算做的,直接的 JDBC 将是您的最佳选择,因为您可以配置它来一次带回少量行,并在使用它们时刷新它们,这就是为什么存在服务器端游标。

ORM 工具不是为批量处理而设计的,它们的设计目的是让您操作对象,并尝试使存储数据的 RDBMS 尽可能透明,大多数在透明部分失败,至少在一定程度上。在这种规模下,由于对象实例化的开销,无法处理数十万行(Object) ,更不用说使用任何 ORM 处理数百万行,并且无法让它在任何合理的时间内执行,这是显而易见的。

Use the appropriate tool. Straight JDBC and Stored Procedures definitely have a place in 2011, especially at what they are better at doing versus these ORM frameworks.

不管你怎么做,把一百万个任何东西,甚至是一个简单的 List<Integer>都不会是非常有效率的。正确的方法是使用一个简单的 SELECT id FROM table,设置为 SERVER SIDE(依赖于供应商) ,光标设置为 FORWARD_ONLY READ-ONLY,并对其进行迭代。

如果你真的需要通过调用网络服务器来处理数百万个 id,那么你也必须同时进行一些并发处理,这样才能在合理的时间内运行。使用 JDBC 光标,每次在 ConcurrentLinkedQueue中放入一些光标,然后有一个小的线程池(# CPU/Core + 1)来拉取和处理它们,这是在内存“正常”的机器上完成任务的唯一方法,因为您已经耗尽了内存。

也看看这个 回答

Page 537 of 使用 Hibernate 的 Java 持久性 gives a solution using ScrollableResults, but alas it's only for Hibernate.

So it seems that using setFirstResult/setMaxResults and manual iteration really is necessary. Here's my solution using JPA:

private List<Model> getAllModelsIterable(int offset, int max)
{
return entityManager.createQuery("from Model m", Model.class).setFirstResult(offset).setMaxResults(max).getResultList();
}

然后,像这样使用它:

private void iterateAll()
{
int offset = 0;


List<Model> models;
while ((models = Model.getAllModelsIterable(offset, 100)).size() > 0)
{
entityManager.getTransaction().begin();
for (Model model : models)
{
log.info("do something with model: " + model.getId());
}


entityManager.flush();
entityManager.clear();
em.getTransaction().commit();
offset += models.size();
}
}

在直接的 JPA 中无法做到这一点,但是 Hibernate 支持无状态会话和可滚动的结果集。

We routinely process 数十亿 of rows with its help.

下面是到文档的链接: http://docs.jboss.org/hibernate/core/3.3/reference/en/html/batch.html#batch-statelesssession

我尝试了这里给出的答案,但是 JBoss 5.1 + MySQL Connector/J5.1.15 + Hibernate 3.3.2无法处理这些问题。我们刚刚从 JBoss4.x 迁移到 JBoss5.1,所以我们现在仍然坚持使用它,因此我们可以使用的最新 Hibernate 是3.3。

添加一些额外的参数就可以完成这项工作,像这样的代码在没有 OOME 的情况下运行:

        StatelessSession session = ((Session) entityManager.getDelegate()).getSessionFactory().openStatelessSession();


Query query = session
.createQuery("SELECT a FROM Address a WHERE .... ORDER BY a.id");
query.setFetchSize(Integer.valueOf(1000));
query.setReadOnly(true);
query.setLockMode("a", LockMode.NONE);
ScrollableResults results = query.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
Address addr = (Address) results.get(0);
// Do stuff
}
results.close();
session.close();

关键行是 createQuery 和捲动之间的查询参数。如果没有它们,“滚动”调用将尝试将所有内存加载到内存中,并且要么永远不会完成,要么运行到 OutOfMemory 错误。

我很惊讶地看到,在这里的答案中,存储过程的使用并没有更加突出。在过去,当我不得不做类似的事情时,我创建一个存储过程,它以小块的形式处理数据,然后睡一会儿,然后继续。睡眠的原因是为了不让数据库过载,因为数据库可能也被用于更实时的查询类型,例如连接到一个网站。如果没有其他人在使用数据库,那么您可以省略睡眠。如果您需要确保处理每条记录一次且仅一次,那么您将需要创建一个额外的表(或字段)来存储已处理的记录,以便在重新启动时具有弹性。

这里节省的性能是显著的,可能比你在 JPA/Hibernate/AppServer 领域所能做的任何数量级都要快,而且你的数据库服务器很可能有自己的服务器端光标类型的机制来有效地处理大型结果集。性能节省来自不必将数据从数据库服务器发送到应用程序服务器,在应用程序服务器上处理数据,然后将数据发送回去。

使用存储过程有一些显著的缺点,这可能完全排除了这一点,但是如果您在个人工具箱中具备了这种技能,并且可以在这种情况下使用它,那么您就可以相当快地解决这些问题。

如果您使用 EclipseLink I’使用此方法获得 Iterable 结果

private static <T> Iterable<T> getResult(TypedQuery<T> query)
{
//eclipseLink
if(query instanceof JpaQuery) {
JpaQuery<T> jQuery = (JpaQuery<T>) query;
jQuery.setHint(QueryHints.RESULT_SET_TYPE, ResultSetType.ForwardOnly)
.setHint(QueryHints.SCROLLABLE_CURSOR, true);


final Cursor cursor = jQuery.getResultCursor();
return new Iterable<T>()
{
@SuppressWarnings("unchecked")
@Override
public Iterator<T> iterator()
{
return cursor;
}
};
}
return query.getResultList();
}

封闭法

static void closeCursor(Iterable<?> list)
{
if (list.iterator() instanceof Cursor)
{
((Cursor) list.iterator()).close();
}
}

你可以用另一个“把戏”。只加载您感兴趣的实体的标识符集合。假设标识符的类型为 long = 8字节,那么10 ^ 6个这样的标识符的列表大约为8Mb。如果它是一个批处理过程(一次一个实例) ,那么它是可以忍受的。然后迭代完成任务。

还有一句话——无论如何都应该以块的形式执行此操作——特别是在修改记录时,否则数据库中的 回滚段回滚段回滚段将会增长。

在设置 firstResult/maxRows 策略时,对于远离顶部的结果,非常非常会比较慢。

还要考虑到数据库可能是在 read commited isolation中运行的,因此要避免幻象读取加载标识符,然后逐个(或10乘10或其他)加载实体。

冬眠有4种不同的方式来达到你想要的效果。每一个都有设计折衷、限制和结果。我建议探索每一个并决定哪一个适合你的情况。

  1. 使用带滚动()的无状态会话
  2. 在每次迭代之后使用 session. clear ()。当需要附加其他实体时,然后在单独的会话中加载它们。实际上,第一个会话是模拟无状态会话,但保留有状态会话的所有特性,直到分离对象。
  3. 使用 iterate ()或 list () ,但是在第一个查询中只获取 id,然后在每次迭代的单独会话中执行 session.load 并在迭代结束时关闭会话。
  4. 使用 Queryy.iterate ()和 EntityManager.detach () aka Session.evect () ;

详述@Tomasz Nurkiewicz 的回答。您可以访问 DataSource,这反过来又可以为您提供连接

@Resource(name = "myDataSource",
lookup = "java:comp/DefaultDataSource")
private DataSource myDataSource;

在你的代码中

try (Connection connection = myDataSource.getConnection()) {
// raw jdbc operations
}

这将允许您为某些特定的大批处理操作(如导入/导出)绕过 JPA,但是如果需要,您仍然可以访问其他 JPA 操作的实体管理器。

下面是一个简单直接的 JPA 示例(在 Kotlin 中) ,它展示了如何对任意大的结果集进行分页,一次读取100个项目的块,而不使用游标(每个游标消耗数据库上的资源)。它使用键集分页。

有关键字集分页的概念,请参见 https://use-the-index-luke.com/no-offset,有关不同的分页方法及其缺点的比较,请参见 https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/

/*
create table my_table(
id int primary key, -- index will be created
my_column varchar
)
*/


fun keysetPaginationExample() {
var lastId = Integer.MIN_VALUE
do {


val someItems =
myRepository.findTop100ByMyTableIdAfterOrderByMyTableId(lastId)


if (someItems.isEmpty()) break


lastId = someItems.last().myTableId


for (item in someItems) {
process(item)
}


} while (true)
}

使用 JPA 和 NativeQuery 每次使用偏移量获取大小元素的示例

public List<X> getXByFetching(int fetchSize) {
int totalX = getTotalRows(Entity);
List<X> result = new ArrayList<>();
for (int offset = 0; offset < totalX; offset = offset + fetchSize) {
EntityManager entityManager = getEntityManager();
String sql = getSqlSelect(Entity) + " OFFSET " + offset + " ROWS";
Query query = entityManager.createNativeQuery(sql, X.class);
query.setMaxResults(fetchSize);
result.addAll(query.getResultList());
entityManager.flush();
entityManager.clear();
return result;
}

最后,在 JPA 2.2和 Hibernate (至少在 v5.4.30中)中找到了您想要的答案,它使用了前面答案中提到的 Scrollable 实现。

您的代码现在可以看起来像这样:

entityManager().createQuery("from Model m", Model.class)
.getResultStream();
.forEach(model -> System.out.println(model.getId());