使用 Webflux R2dbc 和 Postgres 构建响应式 Spring Boot 应用
在本文中,你将学习如何使用 Spring WebFlux、R2DBC 和 Postgres 数据库实现和测试响应式(Reactive) Spring Boot 应用程序。我们将使用最新版本的 Spring Boot 3 创建两个用 Kotlin 编写的简单应用程序。我们的应用程序通过 HTTP 公开一些 REST 端点。为了测试它们之间的通信以及与 Postgres 数据库的集成,我们将使用 Testcontainers 和 Netty Mock Server。
源码
你可以可以克隆我的 GitHub repository。它包含 employee-service
和 organization-service
两个应用程序。之后,你只需按照我的说明操作即可。
依赖
第一步,我们将添加几个与 Kotlin 相关的依赖。除了标准库,我们还可以加入 Kotlin 对 Jackson(JSON 序列化/反序列化)的支持:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
我们还需要包含两个 Spring Boot Starter。为了创建响应式 Spring @Controller
,我们需要使用 Spring WebFlux 模块。有了 Spring Boot Data R2DBC Starter,我们就能以响应方式使用 Spring Data Repository。最后,我们还需要加入 R2DBC 提供的 Postgres 驱动程序。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
我们的项目中有几个测试依赖。我们需要包含标准的 Spring Boot Test Starter、支持 JUnit 5、Postgres 和 R2DBC 的 Testcontainers,以及用于模拟响应式 API 的 Mock Server Netty 模块。还添加了 spring-boot-testcontainers
模块,以利用 Spring Boot 和 Testcontainers 之间的内置集成。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>5.15.0</version>
<scope>test</scope>
</dependency>
最后一个依赖关系是可选的。我们可以在应用程序中加入 Spring Boot Actuator。它将 R2DBC 连接状态添加到健康检查中,并将若干指标与连接池状态结合起来。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
实现 Spring Reactive 应用程序
下面是第一个应用程序 - employee-service
的 model 类:
class Employee(val name: String,
val salary: Int,
val organizationId: Int) {
@Id var id: Int? = null
}
这是 repository 接口。它需要继承 R2dbcRepository
接口。与标准 Spring Data Repository 一样,我们可以定义多个 find 方法。不过,它们使用 Reactor Mono
或 Flux
封装返回对象,而不是实体。
interface EmployeeRepository: R2dbcRepository<Employee, Int> {
fun findByOrganizationId(organizationId: Int): Flux<Employee>
}
下面是 @RestController
的实现。我们需要注入 EmployeeRepository
Bean。然后,我们使用 repository Bean 以响应式的方式与数据库交互。我们的端点也会返回由 Reactor Mono
和 Flux
封装的对象。有三个 find 端点和一个 POST 端点:
@RestController
@RequestMapping("/employees")
class EmployeeController {
@Autowired
lateinit var repository : EmployeeRepository
@GetMapping // (1)
fun findAll() : Flux<Employee> = repository.findAll()
@GetMapping("/{id}") // (2)
fun findById(@PathVariable id: Int) : Mono<Employee> =
repository.findById(id)
@GetMapping("/organization/{organizationId}") // (3)
fun findByOrganizationId(@PathVariable organizationId: Int):
Flux<Employee> = repository.findByOrganizationId(organizationId)
@PostMapping // (4)
fun add(@RequestBody employee: Employee) : Mono<Employee> =
repository.save(employee)
}
- 检索所有 employee。
- 根据 employee
id
检索。 - 根据 organization
id
检索所有 employee。 - 添加新的 employee。
我们还需要在 Spring Boot application.yml
中配置数据库连接设置:
spring:
application:
name: employee-service
r2dbc:
url: r2dbc:postgresql://localhost:5432/spring
username: spring
password: spring123
这是我们的 main class。我们希望应用程序在启动时在数据库中创建一个表。使用 R2DBC 时,我们需要准备一段代码,以便使用 schema.sql
文件填充 schema。
@SpringBootApplication
class EmployeeApplication {
@Bean
fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
initializer.setDatabasePopulator(
ResourceDatabasePopulator(ClassPathResource("schema.sql")))
return initializer
}
}
fun main(args: Array<String>) {
runApplication<EmployeeApplication>(*args)
}
然后将 schema.sql 文件放到 src/main/resources
目录中即可。
CREATE TABLE employee (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
salary INT,
organization_id INT
);
让我们切换到 organization-service
。实现过程非常相似。这是我们的 domain model 类:
class Organization(var name: String) {
@Id var id: Int? = null
}
我们的应用程序要与 employee-service
进行通信。因此,我们需要定义 WebClient
Bean。它将从 application properties 中获取目标服务的地址。
@SpringBootApplication
class OrganizationApplication {
@Bean
fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
initializer.setDatabasePopulator(
ResourceDatabasePopulator(ClassPathResource("schema.sql")))
return initializer
}
@Value("\${employee.client.url}")
private lateinit var employeeUrl: String
@Bean
fun webClient(builder: WebClient.Builder): WebClient {
return builder.baseUrl(employeeUrl).build()
}
}
fun main(args: Array<String>) {
runApplication<OrganizationApplication>(*args)
}
There is also the repository interface OrganizationRepository
. Our @RestController
uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service
. As the response from the GET /employees/{id}/with-employees
it returns the OrganizationDTO
.
还有一个 repository 接口 OrganizationRepository
。我们的 @RestController
使用 repository Bean 与数据库交互,并使用 WebClient
Bean 调用 employee-service
暴露的端点。GET /employees/{id}/with-employees
的响应会返回 OrganizationDTO
。
@RestController
@RequestMapping("/organizations")
class OrganizationController {
@Autowired
lateinit var repository : OrganizationRepository
@Autowired
lateinit var client : WebClient
@GetMapping
fun findAll() : Flux<Organization> = repository.findAll()
@GetMapping("/{id}")
fun findById(@PathVariable id : Int): Mono<Organization> =
repository.findById(id)
@GetMapping("/{id}/with-employees")
fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
.retrieve().bodyToFlux(Employee::class.java)
val org : Mono<Organization> = repository.findById(id)
return org.zipWith(employees.collectList()).log()
.map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
}
@PostMapping
fun add(@RequestBody employee: Organization) : Mono<Organization> =
repository.save(employee)
}
下面是我们的 DTO 实现:
data class OrganizationDTO(var id: Int?, var name: String) {
var employees : MutableList<Employee> = ArrayList()
constructor(employees: MutableList<Employee>) : this(null, "") {
this.employees = employees
}
constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
this.employees = employees
}
}
集成测试
完成实现后,我们就可以准备几个集成测试了。正如我在开头提到的,我们将使用 Testcontainers 在测试期间运行 Postgres 容器。我们的测试运行应用程序,并利用自动配置的 WebTestClient
实例调用 API 端点 (1)。我们需要在测试前启动 Postgres 容器。因此,我们需要在 companion object
部分定义容器 bean (2)。有了 @ServiceConnection
注解,我们就不必手动设置属性了,Spring Boot 会帮我们完成 (3)。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {
@Autowired
private lateinit var webTestClient: WebTestClient // (1)
companion object { // (2)
@Container
@ServiceConnection // (3)
val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
withDatabaseName("spring")
withUsername("spring")
withPassword("spring123")
}
}
@Test
@Order(1)
fun shouldStart() {
}
@Test
@Order(2)
fun shouldAddEmployee() {
webTestClient.post().uri("/employees")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Employee("Test", 1000, 1))
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindEmployee() {
webTestClient.get().uri("/employees/1")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindEmployees() {
webTestClient.get().uri("/employees")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody().jsonPath("$.length()").isEqualTo(1)
.jsonPath("$[0].id").isNotEmpty
}
}
organization-service
的测试类要复杂一些。这是因为我们需要模拟与 employee-service
的通信。为此,我们使用了 ClientAndServer
对象 (1)。它在所有测试之前启动一次 (2),在测试之后停止 (3)。我们模拟 organization-service
调用的 GET /employees/organization/{id}
端点 (4)。然后,我们调用 organization-service GET /organizations/{id}/with-employees
端点 (5)。最后,我们将验证它是否在 JSON 响应中返回 employee 列表。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {
@Autowired
private lateinit var webTestClient: WebTestClient
companion object {
@Container
@ServiceConnection
val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
withDatabaseName("spring")
withUsername("spring")
withPassword("spring123")
}
private var mockServer: ClientAndServer? = null // (1)
@BeforeAll
@JvmStatic
internal fun beforeAll() { // (2)
mockServer = ClientAndServer.startClientAndServer(8090);
}
@AfterAll
@JvmStatic
internal fun afterAll() { // (3)
mockServer!!.stop()
}
}
@Test
@Order(1)
fun shouldStart() {
}
@Test
@Order(2)
fun shouldAddOrganization() {
webTestClient.post().uri("/organizations")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Organization("Test"))
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganization() {
webTestClient.get().uri("/organizations/1")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganizations() {
webTestClient.get().uri("/organizations")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody().jsonPath("$.length()").isEqualTo(1)
.jsonPath("$[0].id").isNotEmpty
}
@Test
@Order(3)
fun shouldFindOrganizationWithEmployees() { // (4)
mockServer!!.`when`(request()
.withMethod("GET")
.withPath("/employees/organization/1"))
.respond(response()
.withStatusCode(200)
.withContentType(MediaType.APPLICATION_JSON)
.withBody(createEmployees()))
webTestClient.get().uri("/organizations/1/with-employees")
.accept(MediaType.APPLICATION_JSON) // (5)
.exchange()
.expectStatus().is2xxSuccessful
.expectBody()
.jsonPath("$.id").isNotEmpty
.jsonPath("$.employees.length()").isEqualTo(2)
.jsonPath("$.employees[0].id").isEqualTo(1)
.jsonPath("$.employees[1].id").isEqualTo(2)
}
private fun createEmployees(): String {
val employees: List<Employee> = listOf<Employee>(
Employee(1, "Test1", 10000, 1),
Employee(2, "Test2", 20000, 1)
)
return jacksonObjectMapper().writeValueAsString(employees)
}
}
你可以在自己电脑上运行这些测试,验证所有测试是否已成功完成。clone repository 后,你需要运行 Docker 并使用以下 Maven 命令构建应用程序:
$ mvn clean package
我们还可以在 CircleCI 上为应用程序准备构建定义(build definition)。由于我们需要运行 Testcontainers,因此需要一台运行有 Docker 守护进程的机器。下面是 .circle/config.yml
文件中为 CircleCI 构建 pipeline 的配置:
# .circleci/config.yml
version: 2.1
jobs:
build:
docker:
- image: 'cimg/openjdk:20.0'
steps:
- checkout
- run:
name: Analyze on SonarCloud
command: mvn verify sonar:sonar -DskipTests
executors:
machine_executor_amd64:
machine:
image: ubuntu-2204:2022.04.2
environment:
architecture: "amd64"
platform: "linux/amd64"
orbs:
maven: circleci/maven@1.4.1
workflows:
maven_test:
jobs:
- maven/test:
executor: machine_executor_amd64
- build:
context: SonarCloud
下面是在 CircleCI 上的构建结果:
如果你正在运行 Docker,也可以使用 Postgres 容器启动我们的 Spring Boot 响应式应用程序。这要归功于 spring-boot-testcontainers
模块。有一个专用的 @TestConfiguration
类可用于在开发模式下运行 Postgres:
@TestConfiguration
class PostgresContainerDevMode {
@Bean
@ServiceConnection
fun postgresql(): PostgreSQLContainer<*>? {
return PostgreSQLContainer("postgres:14.0")
.withUsername("spring")
.withPassword("spring123")
}
}
现在,我们需要定义 test main class,它将使用 PostgresContainerDevMode
类中提供的配置。
class EmployeeApplicationTest
fun main(args: Array<String>) {
fromApplication<EmployeeApplication>()
.with(PostgresContainerDevMode::class)
.run(*args)
}
要在 Docker 上运行 Dev Postgres 应用程序,只需执行以下 Maven 命令:
$ mvn spring-boot:test-run
参考:https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/