使用 Stream API 处理 JDBC ResultSet

1、概览

通常,我们使用遍历从 JDBC ResultSet 中迭代检索到的数据,不过在某些情况下,我更喜欢用 record Stream

本文将带你了解使用 Stream API 处理 JDBC 结果集的几种方法。

2、使用 Spliterators

首先是纯 JDK 方法,使用 Spliterators 创建流。

首先,为实体定义一个 Model:

public record CityRecord(String city, String country) {
}

CityRecord 中,我们存储了有关 city(城市)及其 country(国家)的信息。

接下来,创建一个能与数据库交互并返回 Stream<CityRecord> 的 Repository:

public class JDBCStreamAPIRepository {

    private static final String QUERY = "SELECT name, country FROM cities";
    private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class);

    public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection)
            throws SQLException {

        PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
        connection.setAutoCommit(false);
        preparedStatement.setFetchSize(5000);
        ResultSet resultSet = preparedStatement.executeQuery();

        return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>(
          Long.MAX_VALUE, Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super CityRecord> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createCityRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false);
    }

    private CityRecord createCityRecord(ResultSet resultSet) throws SQLException {
        return new CityRecord(resultSet.getString(1), resultSet.getString(2));
    }
}

我们创建了一个 PreparedStatement 来检索 cities 表中的所有项目,并指定了 FetchSize 以控制内存消耗。我们使用 AbstractSpliterator 生成一个 Stream,只要 ResultSet 有更多的值,就会生成新的 Record。此外,我们还使用 createCityRecord 方法将 Record 映射为 CityRecord 中。

最后,为 Repository 编写一个测试:

public class JDBCResultSetWithStreamAPIUnitTest {
    private static Connection connection = null;
    private static final String JDBC_URL = "jdbc:h2:mem:testDatabase";
    private static final String USERNAME = "dbUser";
    private static final String PASSWORD = "dbPassword";

    JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository();

    @BeforeEach
    void setup() throws Exception {
        connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
        initialDataSetup();
    }

    private void initialDataSetup() throws SQLException {
        Statement statement = connection.createStatement();
        String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))";
        statement.execute(ddlQuery);

        List<String> sqlQueryList = Arrays.asList(
          "INSERT INTO cities VALUES ('London', 'United Kingdom')",
          "INSERT INTO cities VALUES ('Sydney', 'Australia')",
          "INSERT INTO cities VALUES ('Bucharest', 'Romania')"
        );

        for (String query : sqlQueryList) {
            statement.execute(query);
        }
    }

    @Test
    void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException {

        Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
          .getCitiesStreamUsingSpliterator(connection);
        List<CityRecord> cities = cityRecords.toList();

        assertThat(cities)
          .containsExactly(
            new CityRecord("London", "United Kingdom"),
            new CityRecord("Sydney", "Australia"),
            new CityRecord("Bucharest", "Romania"));
    }
}

如上,创建 H2 数据库的连接,并在测试前为 cities 表准备了一些条目。最后,验证 Repository 是否能以 Stream 的形式从表中返回所有预期条目。

3、使用 JOOQ

JOOQ 是一个用于处理关系数据库的流行 ORM 框架。它已经提供了从结果集检索 Stream 的方法。

首先,添加必要的 依赖

<dependency>
    <groupId>org.jooq</groupId>
    <artifactId>jooq</artifactId>
    <version>3.19.11</version>
</dependency>

接下来,为 JDBCStreamAPIRepository 添加一个新方法:

public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
        throws SQLException {

    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();

    return DSL.using(connection)
      .fetchStream(resultSet)
      .map(r -> new CityRecord(r.get("NAME", String.class),
        r.get("COUNTRY", String.class)))];
}

我们使用 ResultQuery 类中的 fetchStream() 方法从 ResultSet 中创建 record Stream。此外,我们还将 JOOQ record 映射到 CityRecord 实例,然后再从该方法中返回。

调用新方法,并验证它是否运行正常:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {

    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
      .getCitiesStreamUsingJOOQ(connection);
    List<CityRecord> cities = cityRecords.toList();

    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

不出所料,我们在 Stream 中检索到了数据库中的所有城市记录。

4、使用 jdbc-stream

或者,我们可以使用一个名为 jdbc-stream 的轻量级库,从 ResultSet 创建一个流。

添加 依赖

<dependency>
    <groupId>com.github.juliomarcopineda</groupId>
    <artifactId>jdbc-stream</artifactId>
    <version>0.1.1</version>
</dependency>

现在,为 JDBCStreamAPIRepository 添加一个新方法:

public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
        throws SQLException {

    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();

    return JdbcStream.stream(resultSet)
      .map(r -> {
          try {
              return createCityRecord(resultSet);
          } catch (SQLException e) {
              throw new RuntimeException(e);
          }
      });
}

使用 JdbcStreamResultSet 中创建 Stream。在底层,它使用 Spliterators 并以与我们自己的实现相同的逻辑构建 Stream。

现在,测试新的 Repository:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {

    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
            .getCitiesStreamUsingJdbcStream(connection);
    List<CityRecord> cities = cityRecords.toList();

    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

结果符合预期,jdbc-stream 获取了所有预期的条目。

5、资源关闭

在使用 JDBC 时,必须记得要关闭使用的所有资源,以避免连接泄漏。通常的做法是在 ConnectionPreparedStatementResultSet 中使用 try-with-resources 语法。但是,这种方法并不适合使用 Stream。如果我们从 Repository 方法返回一个 Stream,我们的所有资源都将已关闭,流上的任何操作都无法访问它们。

为了避免这个问题,我们需要使用 Stream 的 onClose() 方法关闭所有资源。此外,我们还必须确保在完成对流的处理后关闭该 Stream。

修改 Repository 方法,加入资源关闭逻辑:

public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
        throws SQLException {

    PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
    connection.setAutoCommit(false);
    preparedStatement.setFetchSize(5000);
    ResultSet resultSet = preparedStatement.executeQuery();

    return JdbcStream.stream(resultSet)
      .map(r -> {
          try {
              return createCityRecord(resultSet);
          } catch (SQLException e) {
              throw new RuntimeException(e);
          }
      })
      .onClose(() -> closeResources(connection, resultSet, preparedStatement));
}

private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
    try {
        resultSet.close();
        preparedStatement.close();
        connection.close();
        logger.info("Resources closed");
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}

我们添加了 closeResources() 方法,并将其添加到 onClose() 流处理上。

现在,修改客户端代码,确保流在使用后关闭:

@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {

    Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
            .getCitiesStreamUsingJdbcStream(connection);
    List<CityRecord> cities = cityRecords.toList();
    cityRecords.close();

    assertThat(cities)
      .containsExactly(
        new CityRecord("London", "United Kingdom"),
        new CityRecord("Sydney", "Australia"),
        new CityRecord("Bucharest", "Romania"));
}

如上,我们会在处理完所有项目后关闭 Stream。此外,我们还可以看到一条日志信息,表明所有资源都已关闭:

[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed

6、总结

本文介绍了使用 Stream API 操作 JDBC ResultSet 的几个方式。在处理无法一次性加载到内存中的大数据集时,这种方法尤其有用。此外,如果我们在应用中遵循函数式风格,那么 Stream 与我们的逻辑非常吻合。


Ref:https://www.baeldung.com/stream-api-jdbc-resultset