使用 Webflux R2dbc 和 Postgres 构建响应式 Spring Boot 应用

在本文中,你将学习如何使用 Spring WebFlux、R2DBC 和 Postgres 数据库实现和测试响应式(Reactive) Spring Boot 应用程序。我们将使用最新版本的 Spring Boot 3 创建两个用 Kotlin 编写的简单应用程序。我们的应用程序通过 HTTP 公开一些 REST 端点。为了测试它们之间的通信以及与 Postgres 数据库的集成,我们将使用 Testcontainers 和 Netty Mock Server。

源码

你可以可以克隆我的 GitHub repository。它包含 employee-serviceorganization-service 两个应用程序。之后,你只需按照我的说明操作即可。

依赖

第一步,我们将添加几个与 Kotlin 相关的依赖。除了标准库,我们还可以加入 Kotlin 对 Jackson(JSON 序列化/反序列化)的支持:

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

我们还需要包含两个 Spring Boot Starter。为了创建响应式 Spring @Controller,我们需要使用 Spring WebFlux 模块。有了 Spring Boot Data R2DBC Starter,我们就能以响应方式使用 Spring Data Repository。最后,我们还需要加入 R2DBC 提供的 Postgres 驱动程序。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

我们的项目中有几个测试依赖。我们需要包含标准的 Spring Boot Test Starter、支持 JUnit 5、Postgres 和 R2DBC 的 Testcontainers,以及用于模拟响应式 API 的 Mock Server Netty 模块。还添加了 spring-boot-testcontainers 模块,以利用 Spring Boot 和 Testcontainers 之间的内置集成。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

最后一个依赖关系是可选的。我们可以在应用程序中加入 Spring Boot Actuator。它将 R2DBC 连接状态添加到健康检查中,并将若干指标与连接池状态结合起来。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

实现 Spring Reactive 应用程序

下面是第一个应用程序 - employee-service 的 model 类:

class Employee(val name: String, 
               val salary: Int, 
               val organizationId: Int) {
    @Id var id: Int? = null
}

这是 repository 接口。它需要继承 R2dbcRepository 接口。与标准 Spring Data Repository 一样,我们可以定义多个 find 方法。不过,它们使用 Reactor MonoFlux 封装返回对象,而不是实体。

interface EmployeeRepository: R2dbcRepository<Employee, Int> {
    fun findByOrganizationId(organizationId: Int): Flux<Employee>
}

下面是 @RestController 的实现。我们需要注入 EmployeeRepository Bean。然后,我们使用 repository Bean 以响应式的方式与数据库交互。我们的端点也会返回由 Reactor MonoFlux 封装的对象。有三个 find 端点和一个 POST 端点:

@RestController
@RequestMapping("/employees")
class EmployeeController {

   @Autowired
   lateinit var repository : EmployeeRepository

   @GetMapping // (1)
   fun findAll() : Flux<Employee> = repository.findAll()

   @GetMapping("/{id}") // (2)
   fun findById(@PathVariable id: Int) : Mono<Employee> = 
      repository.findById(id)

   @GetMapping("/organization/{organizationId}") // (3)
   fun findByOrganizationId(@PathVariable organizationId: Int): 
      Flux<Employee> = repository.findByOrganizationId(organizationId)

   @PostMapping // (4)
   fun add(@RequestBody employee: Employee) : Mono<Employee> = 
      repository.save(employee)

}
  1. 检索所有 employee。
  2. 根据 employee id 检索。
  3. 根据 organization id 检索所有 employee。
  4. 添加新的 employee。

我们还需要在 Spring Boot application.yml 中配置数据库连接设置:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123

这是我们的 main class。我们希望应用程序在启动时在数据库中创建一个表。使用 R2DBC 时,我们需要准备一段代码,以便使用 schema.sql 文件填充 schema。

@SpringBootApplication
class EmployeeApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

}

fun main(args: Array<String>) {
   runApplication<EmployeeApplication>(*args)
}

然后将 schema.sql 文件放到 src/main/resources 目录中即可。

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

让我们切换到 organization-service。实现过程非常相似。这是我们的 domain model 类:

class Organization(var name: String) {
    @Id var id: Int? = null
}

我们的应用程序要与 employee-service 进行通信。因此,我们需要定义 WebClient Bean。它将从 application properties 中获取目标服务的地址。

@SpringBootApplication
class OrganizationApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

   @Value("\${employee.client.url}")
   private lateinit var employeeUrl: String

   @Bean
   fun webClient(builder: WebClient.Builder): WebClient {
      return builder.baseUrl(employeeUrl).build()
   }
}

fun main(args: Array<String>) {
    runApplication<OrganizationApplication>(*args)
}

There is also the repository interface OrganizationRepository. Our @RestController uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service. As the response from the GET /employees/{id}/with-employees it returns the OrganizationDTO.

还有一个 repository 接口 OrganizationRepository。我们的 @RestController 使用 repository Bean 与数据库交互,并使用 WebClient Bean 调用 employee-service 暴露的端点。GET /employees/{id}/with-employees 的响应会返回 OrganizationDTO

@RestController
@RequestMapping("/organizations")
class OrganizationController {

   @Autowired
   lateinit var repository : OrganizationRepository
   @Autowired
   lateinit var client : WebClient

   @GetMapping
   fun findAll() : Flux<Organization> = repository.findAll()

   @GetMapping("/{id}")
   fun findById(@PathVariable id : Int): Mono<Organization> = 
      repository.findById(id)

   @GetMapping("/{id}/with-employees")
   fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
      val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
             .retrieve().bodyToFlux(Employee::class.java)
      val org : Mono<Organization> = repository.findById(id)
      return org.zipWith(employees.collectList()).log()
             .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

   @PostMapping
   fun add(@RequestBody employee: Organization) : Mono<Organization> = 
      repository.save(employee)

}

下面是我们的 DTO 实现:

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList<Employee> = ArrayList()
    constructor(employees: MutableList<Employee>) : this(null, "") {
        this.employees = employees
    }
    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
        this.employees = employees
    }
}

集成测试

完成实现后,我们就可以准备几个集成测试了。正如我在开头提到的,我们将使用 Testcontainers 在测试期间运行 Postgres 容器。我们的测试运行应用程序,并利用自动配置的 WebTestClient 实例调用 API 端点 (1)。我们需要在测试前启动 Postgres 容器。因此,我们需要在 companion object 部分定义容器 bean (2)。有了 @ServiceConnection 注解,我们就不必手动设置属性了,Spring Boot 会帮我们完成 (3)

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient // (1)

   companion object { // (2)

      @Container
      @ServiceConnection // (3)
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddEmployee() {
      webTestClient.post().uri("/employees")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Employee("Test", 1000, 1))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployee() {
      webTestClient.get().uri("/employees/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployees() {
      webTestClient.get().uri("/employees")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

}

organization-service 的测试类要复杂一些。这是因为我们需要模拟与 employee-service 的通信。为此,我们使用了 ClientAndServer 对象 (1)。它在所有测试之前启动一次 (2),在测试之后停止 (3)。我们模拟 organization-service 调用的 GET /employees/organization/{id} 端点 (4)。然后,我们调用 organization-service GET /organizations/{id}/with-employees 端点 (5)。最后,我们将验证它是否在 JSON 响应中返回 employee 列表。

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient

   companion object {

      @Container
      @ServiceConnection
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

      private var mockServer: ClientAndServer? = null // (1)

      @BeforeAll
      @JvmStatic
      internal fun beforeAll() { // (2)
         mockServer = ClientAndServer.startClientAndServer(8090);
      }

      @AfterAll
      @JvmStatic
      internal fun afterAll() { // (3)
         mockServer!!.stop()
      }
   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddOrganization() {
        webTestClient.post().uri("/organizations")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Organization("Test"))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganization() {
        webTestClient.get().uri("/organizations/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizations() {
        webTestClient.get().uri("/organizations")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizationWithEmployees() { // (4)
        mockServer!!.`when`(request()
               .withMethod("GET")
               .withPath("/employees/organization/1"))
            .respond(response()
                .withStatusCode(200)
                .withContentType(MediaType.APPLICATION_JSON)
                .withBody(createEmployees()))

      webTestClient.get().uri("/organizations/1/with-employees")
          .accept(MediaType.APPLICATION_JSON) // (5)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
          .jsonPath("$.employees.length()").isEqualTo(2)
          .jsonPath("$.employees[0].id").isEqualTo(1)
          .jsonPath("$.employees[1].id").isEqualTo(2)
   }

   private fun createEmployees(): String {
      val employees: List<Employee> = listOf<Employee>(
         Employee(1, "Test1", 10000, 1),
         Employee(2, "Test2", 20000, 1)
      )
      return jacksonObjectMapper().writeValueAsString(employees)
   }
}

你可以在自己电脑上运行这些测试,验证所有测试是否已成功完成。clone repository 后,你需要运行 Docker 并使用以下 Maven 命令构建应用程序:

$ mvn clean package

我们还可以在 CircleCI 上为应用程序准备构建定义(build definition)。由于我们需要运行 Testcontainers,因此需要一台运行有 Docker 守护进程的机器。下面是 .circle/config.yml 文件中为 CircleCI 构建 pipeline 的配置:

# .circleci/config.yml
version: 2.1

jobs:
  build:
    docker:
      - image: 'cimg/openjdk:20.0'
    steps:
      - checkout
      - run:
          name: Analyze on SonarCloud
          command: mvn verify sonar:sonar -DskipTests

executors:
  machine_executor_amd64:
    machine:
      image: ubuntu-2204:2022.04.2
    environment:
      architecture: "amd64"
      platform: "linux/amd64"

orbs:
  maven: circleci/maven@1.4.1

workflows:
  maven_test:
    jobs:
      - maven/test:
          executor: machine_executor_amd64
      - build:
          context: SonarCloud

下面是在 CircleCI 上的构建结果:

Build results on CircleCI

如果你正在运行 Docker,也可以使用 Postgres 容器启动我们的 Spring Boot 响应式应用程序。这要归功于 spring-boot-testcontainers 模块。有一个专用的 @TestConfiguration 类可用于在开发模式下运行 Postgres:

@TestConfiguration
class PostgresContainerDevMode {

    @Bean
    @ServiceConnection
    fun postgresql(): PostgreSQLContainer<*>? {
        return PostgreSQLContainer("postgres:14.0")
            .withUsername("spring")
            .withPassword("spring123")
    }
}

现在,我们需要定义 test main class,它将使用 PostgresContainerDevMode 类中提供的配置。

class EmployeeApplicationTest

fun main(args: Array<String>) {
    fromApplication<EmployeeApplication>()
       .with(PostgresContainerDevMode::class)
       .run(*args)
}

要在 Docker 上运行 Dev Postgres 应用程序,只需执行以下 Maven 命令:

$ mvn spring-boot:test-run

参考:https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/