使用 Stream API 处理 JDBC ResultSet
1、概览
通常,我们使用遍历从 JDBC ResultSet
中迭代检索到的数据,不过在某些情况下,我更喜欢用 record Stream
。
本文将带你了解使用 Stream API 处理 JDBC 结果集的几种方法。
2、使用 Spliterators
首先是纯 JDK 方法,使用 Spliterators
创建流。
首先,为实体定义一个 Model:
public record CityRecord(String city, String country) {
}
在 CityRecord
中,我们存储了有关 city
(城市)及其 country
(国家)的信息。
接下来,创建一个能与数据库交互并返回 Stream<CityRecord>
的 Repository:
public class JDBCStreamAPIRepository {
private static final String QUERY = "SELECT name, country FROM cities";
private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class);
public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>(
Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super CityRecord> action) {
try {
if(!resultSet.next()) return false;
action.accept(createCityRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false);
}
private CityRecord createCityRecord(ResultSet resultSet) throws SQLException {
return new CityRecord(resultSet.getString(1), resultSet.getString(2));
}
}
我们创建了一个 PreparedStatement
来检索 cities
表中的所有项目,并指定了 FetchSize 以控制内存消耗。我们使用 AbstractSpliterator
生成一个 Stream,只要 ResultSet
有更多的值,就会生成新的 Record
。此外,我们还使用 createCityRecord
方法将 Record
映射为 CityRecord
中。
最后,为 Repository 编写一个测试:
public class JDBCResultSetWithStreamAPIUnitTest {
private static Connection connection = null;
private static final String JDBC_URL = "jdbc:h2:mem:testDatabase";
private static final String USERNAME = "dbUser";
private static final String PASSWORD = "dbPassword";
JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository();
@BeforeEach
void setup() throws Exception {
connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
initialDataSetup();
}
private void initialDataSetup() throws SQLException {
Statement statement = connection.createStatement();
String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))";
statement.execute(ddlQuery);
List<String> sqlQueryList = Arrays.asList(
"INSERT INTO cities VALUES ('London', 'United Kingdom')",
"INSERT INTO cities VALUES ('Sydney', 'Australia')",
"INSERT INTO cities VALUES ('Bucharest', 'Romania')"
);
for (String query : sqlQueryList) {
statement.execute(query);
}
}
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingSpliterator(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
}
如上,创建 H2 数据库的连接,并在测试前为 cities
表准备了一些条目。最后,验证 Repository 是否能以 Stream 的形式从表中返回所有预期条目。
3、使用 JOOQ
JOOQ 是一个用于处理关系数据库的流行 ORM 框架。它已经提供了从结果集检索 Stream 的方法。
首先,添加必要的 依赖:
<dependency>
<groupId>org.jooq</groupId>
<artifactId>jooq</artifactId>
<version>3.19.11</version>
</dependency>
接下来,为 JDBCStreamAPIRepository
添加一个新方法:
public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return DSL.using(connection)
.fetchStream(resultSet)
.map(r -> new CityRecord(r.get("NAME", String.class),
r.get("COUNTRY", String.class)))];
}
我们使用 ResultQuery
类中的 fetchStream()
方法从 ResultSet
中创建 record Stream。此外,我们还将 JOOQ record 映射到 CityRecord
实例,然后再从该方法中返回。
调用新方法,并验证它是否运行正常:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJOOQ(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
不出所料,我们在 Stream 中检索到了数据库中的所有城市记录。
4、使用 jdbc-stream
或者,我们可以使用一个名为 jdbc-stream 的轻量级库,从 ResultSet
创建一个流。
添加 依赖:
<dependency>
<groupId>com.github.juliomarcopineda</groupId>
<artifactId>jdbc-stream</artifactId>
<version>0.1.1</version>
</dependency>
现在,为 JDBCStreamAPIRepository
添加一个新方法:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return JdbcStream.stream(resultSet)
.map(r -> {
try {
return createCityRecord(resultSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
使用 JdbcStream
从 ResultSet
中创建 Stream。在底层,它使用 Spliterators
并以与我们自己的实现相同的逻辑构建 Stream。
现在,测试新的 Repository:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJdbcStream(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
结果符合预期,jdbc-stream
获取了所有预期的条目。
5、资源关闭
在使用 JDBC 时,必须记得要关闭使用的所有资源,以避免连接泄漏。通常的做法是在 Connection
、PreparedStatement
和 ResultSet
中使用 try-with-resources
语法。但是,这种方法并不适合使用 Stream。如果我们从 Repository 方法返回一个 Stream,我们的所有资源都将已关闭,流上的任何操作都无法访问它们。
为了避免这个问题,我们需要使用 Stream 的 onClose()
方法关闭所有资源。此外,我们还必须确保在完成对流的处理后关闭该 Stream。
修改 Repository 方法,加入资源关闭逻辑:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return JdbcStream.stream(resultSet)
.map(r -> {
try {
return createCityRecord(resultSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
})
.onClose(() -> closeResources(connection, resultSet, preparedStatement));
}
private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
try {
resultSet.close();
preparedStatement.close();
connection.close();
logger.info("Resources closed");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
我们添加了 closeResources()
方法,并将其添加到 onClose()
流处理上。
现在,修改客户端代码,确保流在使用后关闭:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJdbcStream(connection);
List<CityRecord> cities = cityRecords.toList();
cityRecords.close();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
如上,我们会在处理完所有项目后关闭 Stream。此外,我们还可以看到一条日志信息,表明所有资源都已关闭:
[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed
6、总结
本文介绍了使用 Stream API 操作 JDBC ResultSet 的几个方式。在处理无法一次性加载到内存中的大数据集时,这种方法尤其有用。此外,如果我们在应用中遵循函数式风格,那么 Stream 与我们的逻辑非常吻合。
Ref:https://www.baeldung.com/stream-api-jdbc-resultset