在 Spring Data JPA 中使用 Stream(流式)查询

简介

本文将带你了解在 Spring Data JPA 中使用 Stream(流式)查询的最佳方式。

当需要获取较大的结果集时,使用 Java Stream 的好处是可以逐步迭代查询结果集,避免一次性获取所有数据可能导致的内存溢出异常。

JPA Stream 方法

2.2 版起,你可以使用 JPA 的 getResultStream 方法以 Stream 的形式处理结果集。

getResultStream 使用 JDBC ResultSet 对给定查询返回的记录进行流式处理。特别是在处理大结果集的时候,这种方法很有效率。

Spring Data JPA Stream 查询方法

如果要对查询结果集进行流式处理,则需要在 Spring Data JPA 查询方法中返回 Java Stream 类型,如下例所示:

@Repository
public interface PostRepository extends BaseJpaRepository<Post, Long> {
 
    @Query("""
        select p
        from Post p
        where date(p.createdOn) >= :sinceDate
        """
    )
    @QueryHints(
        @QueryHint(name = AvailableHints.HINT_FETCH_SIZE, value = "25")
    )
    // 返回 Stream
    Stream<Post> streamByCreatedOnSince(@Param("sinceDate") LocalDate sinceDate);
}

对于 PostgreSQL 和 MySQL,指定 FETCH_SIZE JPA QueryHint 是必要的,它指示 JDBC 驱动每次迭代的时候最多预取 25 条记录。否则,PostgreSQL 和 MySQL JDBC 驱动会在遍历底层 ResultSet 之前预取所有查询结果。

有了 streamByCreatedOnSince 方法后,我们现在来实现一个 updatePostCache 业务方法,该方法将获取最新的 Post 实体并更新内存缓存。

@Transactional(readOnly = true)
public void updatePostCache() {
    LocalDate yesterday = LocalDate.now().minusDays(1);
     
    try(Stream<Post> postStream = postRepository.streamByCreatedOnSince(yesterday)) {   
        postStream.forEach(
            post -> executorService.submit(() ->
                postCache.put(post.getId(), post)
            )
        );
    }
}

注意,updatePostCache service 方法注解了 @Transactional(readOnly = true),因为需要在整个 Stream 遍历的过程中保持数据库连接处于打开状态。

总结

Spring Data JPA 提供了对流式(Stream)查询方法的支持,但在使用此功能之前有几件事需要注意:

  • 首先,需要确保不要一次性检索所有数据。
  • 其次,需要确保在遍历 Stream 之前不释放数据库连接。

Ref:https://vladmihalcea.com/spring-data-jpa-stream/