使用 ResultSet 的 java.util.stream

我只有几个表,其中包含大量数据(大约1亿条记录)。所以我不能将这些数据存储在内存中,但是我想使用 java.util.stream类对这个 结果集进行流处理,并将这个流传递给另一个类。我读过有关 Stream.ofStream.Builder操作符的内容,但它们是内存中的缓冲流。那么有没有办法解决这个问题呢? 先谢谢你。

更新 # 1

好吧,我谷歌了一下,找到了 Jooq图书馆。我不确定,但看起来它可能适用于我的测试用例。总而言之,我只有几个表格有大量的数据。我想流我的结果集和转移这个流到另一个方法。就像这样:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {


Stream<Record> record = null;
try (Connection connection = dataSource.getConnection()) {
String sql = "select * from " + table;


try (PreparedStatement pSt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
//
record = DSL.using(connection)
.fetch(resultSet).stream();
}
} catch (SQLException sqlEx) {
logger.error(sqlEx);
}


return record;
}

谁能告诉我,我走的路对吗? 谢谢。

更新 # 2

我在 Jooq上做了一些实验,现在可以说上面的决定不适合我。这个代码 record = DSL.using(connection).fetch(resultSet).stream();花费了太多的时间

67685 次浏览

我不知道有哪个著名的图书馆能为你做到这一点。

也就是说,这篇文章展示了如何用 Iterator (ResultSetIterator)包装结果集,并将其作为第一个参数传递给 Spliterators.spliteratorUnknownSize(),以创建 Spliterator

然后 StreamSupport可以使用 Spliterator 在它上面创建一个 Stream。

他们建议实施 ResultSetIterator课程:

public class ResultSetIterator implements Iterator {


private ResultSet rs;
private PreparedStatement ps;
private Connection connection;
private String sql;


public ResultSetIterator(Connection connection, String sql) {
assert connection != null;
assert sql != null;
this.connection = connection;
this.sql = sql;
}


public void init() {
try {
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();


} catch (SQLException e) {
close();
throw new DataAccessException(e);
}
}


@Override
public boolean hasNext() {
if (ps == null) {
init();
}
try {
boolean hasMore = rs.next();
if (!hasMore) {
close();
}
return hasMore;
} catch (SQLException e) {
close();
throw new DataAccessException(e);
}


}


private void close() {
try {
rs.close();
try {
ps.close();
} catch (SQLException e) {
//nothing we can do here
}
} catch (SQLException e) {
//nothing we can do here
}
}


@Override
public Tuple next() {
try {
return SQL.rowAsTuple(sql, rs);
} catch (DataAccessException e) {
close();
throw e;
}
}
}

然后:

public static Stream stream(final Connection connection,
final String sql,
final Object... parms) {
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
new ResultSetIterator(connection, sql), 0), false);
}

你首先要明白的是

try (Connection connection = dataSource.getConnection()) {
…
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
…
return stream;
}
}

因为当您离开 try块时,资源已经关闭,而 Stream的处理甚至还没有开始。

资源管理构造“ try with resources”适用于方法内部块范围内使用的资源,但是您正在创建一个返回资源的工厂方法。因此,必须确保关闭返回的流将关闭资源,而调用方负责关闭 Stream


此外,您还需要一个函数,它从 ResultSet的一行中生成一个项。假设,你有一个像

Record createRecord(ResultSet rs) {
…
}

你可以创建一个 Stream<Record>基本上就像

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
}
}, false);

但是,要正确地执行此操作,您必须合并异常处理和资源关闭。您可以使用 Stream.onClose注册一个将在 Stream关闭时执行的操作,但它必须是一个不能抛出检查过的异常的 Runnable。类似地,tryAdvance方法不允许引发已检查的异常。由于我们不能简单地在这里嵌套 try(…)块,所以当已经有一个挂起的异常时,在 close中抛出的抑制异常的程序逻辑并不是免费的。

为了帮助我们解决这个问题,我们引入了一种新的类型,它可以包装关闭操作,这些操作可能会抛出已检查的异常,并将它们包装在未检查的异常中。通过实现 AutoCloseable本身,它可以利用 try(…)结构来安全地连接关闭操作:

interface UncheckedCloseable extends Runnable, AutoCloseable {
default void run() {
try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
}
static UncheckedCloseable wrap(AutoCloseable c) {
return c::close;
}
default UncheckedCloseable nest(AutoCloseable c) {
return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
}
}

这样一来,整个行动就变成了:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
throws SQLException {


UncheckedCloseable close=null;
try {
Connection connection = dataSource.getConnection();
close=UncheckedCloseable.wrap(connection);
String sql = "select * from " + table;
PreparedStatement pSt = connection.prepareStatement(sql);
close=close.nest(pSt);
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
close=close.nest(resultSet);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(close);
} catch(SQLException sqlEx) {
if(close!=null)
try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
throw sqlEx;
}
}

该方法在上述实用工具类的一个实例中包装所有资源(ConnectionStatementResultSet)的必要关闭操作。如果在初始化过程中发生异常,则立即执行关闭操作,并将异常传递给调用方。如果流构造成功,则通过 onClose注册关闭操作。

因此,调用者必须确保正确的关闭,如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
// stream operation
}

请注意,通过 RuntimeException传递的 SQLException也被添加到 tryAdvance方法中。因此,现在可以毫无问题地将 throws SQLException添加到 createRecord方法中。

JooQ

我要回答你问题的 < em > jOOQ 部分。到 jOOQ 3.8为止,现在已经有了很多与 jOOQ 与 Stream 相结合的额外特性。这个 jOOQ 页面还记录了其他用法.

你的建议用法:

你试过了:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

实际上,这对于大型结果集并不起作用,因为 fetch(ResultSet)将整个结果集提取到内存中,然后对其调用 Collection.stream()

更好(懒惰)的用法:

相反,你可以这样写:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
...
}

... 这本质上是为了方便这个:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
Stream<Record> stream = cursor.stream();
...
}

参见 DSLContext.fetchStream(ResultSet)

当然,您也可以让 jOOQ 执行您的 SQL 字符串,而不是与 JDBC 纠缠:

try (Stream<Record> stream =
DSL.using(dataSource)
.resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
.fetchSize(5000)
.fetchStream()) {
...
}

可怕的 SELECT *

正如评论中批评的那样,它们的 jOOQ 使用似乎很慢,因为 jOOQ 使用 fetchLazy()急切地将 LOB 数据提取到内存 尽管如此中。“惰性”一词对应于懒惰地(一个接一个地)获取记录,而不是懒惰地获取列数据。假设您实际上使用 想要来投影整个行,那么一次就可以完全获取一条记录。

如果你不需要一些沉重的行,不要投影它们! SELECT *在 SQL 中几乎总是一个坏主意。缺点:

  • 它会在数据库服务器、网络和客户机中造成更多的 I/O 和内存开销。
  • 它防止覆盖索引使用
  • 它可以防止联接消除转换

更多信息请点击这里。

关于使用资源进行尝试

请注意,由 jOOQ 生成的 Stream是“资源丰富的”,即它包含对开放的 ResultSet(和 PreparedStatement)的引用。因此,如果您真的希望在方法之外返回该流,请确保它已正确关闭!

下面是 Abacus-jdbc提供的最简单的示例。

final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters).filter(...).map(...).collect(...) // lazy execution&loading and auto-close Statement/Connection

或者:

JdbcUtil.prepareQuery(ds, sql)
.stream(ResultRecord.class) // or RowMapper.MAP/...
.filter(...).map(...).collect(...)  // lazy execution&loading and auto-close Statement/Connection

这完全是延迟加载和自动闭包。记录将通过 fetch size从 db 加载(如果未指定,则为默认值) ,并且在收集结果/记录后,语句和连接将自动关闭。

披露: 我是 AbacusUtil 的开发者。

我只是做了总结,提供了一个真实的例子,说明如何在不使用3rd 的情况下流化 ResultSet 和执行简单的 SQL 查询 请按此浏览详情

Blockquote: Java8提供了 Stream 系列并且易于操作。流水线的使用方式使代码清晰、智能。 然而,ResultSet 仍然沿用非常传统的处理方式。根据 ResultSet 的实际使用情况,如果将其转换为 Stream,将非常有帮助。< br/>

.... UncheckedConsumer 需要将 SQLException 转换为 runtimeException 以清除 Lamda。 < br/>

使用我的图书馆可以这样做:

附加专家依赖项:

<dependency>
<groupId>com.github.buckelieg</groupId>
<artifactId>db-fn</artifactId>
<version>0.3.4</version>
</dependency>

在代码中使用库:

Function<Stream<I>, O> processor = stream -> //process input stream
try (DB db = new DB("jdbc:postgresql://host:port/database?user=user&password=pass")) {
processor.apply(
db.select("SELECT * FROM my_table t1 JOIN my_table t2 ON t1.id = t2.id")
.fetchSize(5000)
.execute(rs -> /*ResultSet mapper*/)
);
}

详见 给你

一些称为 尤乔姆框架的 工具的公共模块提供了使用 RowIterator类的简单解决方案。 使用示例:

    PreparedStatement ps = dbConnection.prepareStatement("SELECT * FROM myTable");
new RowIterator(ps).toStream().forEach((RsConsumer)(resultSet) -> {
int value = resultSet.getInt(1);
});

Maven 对 Tools 库的依赖(50KB) :

    <dependency>
<groupId>org.ujorm</groupId>
<artifactId>ujo-tools</artifactId>
<version>1.93</version>
</dependency>

有关更多信息,请参见 JUnit 测试