Webflux

在 WebFlux 中拦截请求并添加自定义请求头

1、概览 Filter(拦截器/过滤器)是 Spring 提供的一个机制,可以在 Controller 处理请求或向客户端返回响应之前拦截并处理请求。 本文将带你了解如何使用 WebFlux 拦截客户端请求以及如何添加自定义 Header。 2、Maven 依赖 添加 spring-boot-starter-webflux 响应式 Web 依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <version>3.1.5</version> </dependency> 3、拦截请求 Spring WebFlux Filter 可分为 WebFilter 和 HandlerFilterFunction 两类。可使用这些过滤器拦截 Web 请求,并添加新的自定义 Header 或修改现有 Header。 3.1、使用 WebFilter WebFilter 以链式拦截方式处理 Web 请求。WebFilter 在全局范围内生效,一旦启用,将拦截所有的请求和响应。 首先,定义基于注解的 Controller: @GetMapping(value= "/trace-annotated") public Mono<String> trace(@RequestHeader(name = "traceId") final String traceId) { return Mono.just("TraceId: ".concat(traceId)); } 然后,拦截 Web 请求,使用 TraceWebFilter 实现添加一个新的 Header traceId: @Component public class TraceWebFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { exchange.

Spring WebFlux Multipart 文件上传

1、概览 Spring WebFlux 是一个响应式 Web 框架,它提供了一个非阻塞 Event Loop,可异步处理 I/O 操作。此外,它还使用 Mono 和 Flux Reactive Stream Publisher 在订阅时发布数据。 这种响应式方式可帮助应用处理大量请求和数据,而无需分配大量资源。 本文将带你了解如何在 Spring WebFlux 中处理 Multipart 文件上传。 2、项目设置 创建一个简单的响应式 Spring Boot 项目,将 Multipart 文件上传到一个目录。 为简单起见,使用项目的根目录来存储文件。在生产中,可以使用 云厂商 OSS、Minio 存储等文件系统。 2.1、Maven 依赖 首先,在 pom.xml 中添加 spring-boot-starter-webflux 依赖,以启动 Spring WebFlux 应用: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <version>3.2.0</version> </dependency> 它提供核心 Spring WebFlux API 和嵌入式 Netty 服务器,用于构建响应式 Web 应用。 另外,还要在 pom.xml 文件中添加 spring-boot-starter-data-r2dbc 和 H2 数据库依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>com.

Reactor WebFlux 与虚拟线程(Virtual Thread)的对比

1、概览 本文将带你了解 Java 19 的 虚拟线程 和 Reactor Webflux 的基本工作原理以及它们的优缺点。 2、代码示例 假如我们开发的是一个电商后台,有如下 “负责计算和发布添加到购物车中的商品价格” 的函数。 class ProductService { private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart"; private final ProductRepository repository; private final DiscountService discountService; private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate; // 构造函数 public void addProductToCart(String productId, String cartId) { Product product = repository.findById(productId) .orElseThrow(() -> new IllegalArgumentException("not found!")); Price price = product.basePrice(); if (product.category().isEligibleForDiscount()) { BigDecimal discount = discountService.discountForProduct(productId); price.setValue(price.getValue().subtract(discount)); } var event = new ProductAddedToCartEvent(productId, price.

Spring WebFlux 中的并发

1、简介 本文将带你了解 Spring WebFlux 响应式应用中的并发。 2、响应式编程的动机 一个典型的 Web 应用由多个复杂的交互部分组成。其中许多交互在本质上是阻塞性的,例如那些涉及数据库调用以获取或更新数据的交互。而其他几个部分则是独立的,可以 并发 执行,也可能是 并行 执行。 并发是多个任务在同一时间段内交替执行,而并行是多个任务同时执行。并发关注的是任务的调度和切换,以提高系统的效率和响应性,而并行关注的是任务的同时执行,以提高计算速度和性能。 例如,用户对 Web 服务器的两个请求可以由不同的线程处理。在多核平台上,这对整体响应时间有明显的好处。因此,这种并发模型被称 thread-per-request(每个请求一个线程)模型: 如上图,每个线程一次处理一个请求。 虽然基于线程的并发为我们解决了部分问题,但却无法解决单个线程内的大部分交互仍然是阻塞的这一事实。此外,在 Java 中使用原生线程来实现并发时,还需要付出上下文切换的巨大代价。 与此同时,随着 Web 应用面临的请求越来越多,thread-per-request 模式开始无法满足人们的期望。 因此,我们需要一种并发模型,它可以帮助我们用相对较少的线程数处理越来越多的请求。这也是采用响应式编程的主要动机之一。 3、响应式编程中的并发 响应式编程可以帮助我们根据数据流和变化的传播来构建程序。在完全无阻塞的环境中,这可以让我们实现更高的并发性和更好的资源利用率。 然而,响应式编程是否完全摒弃了基于线程的并发?虽然这种说法有些激烈,但响应式编程肯定与使用线程实现并发的方法截然不同。响应式编程带来的根本区别在于 异步。 换句话说,程序流程从一连串同步操作转变为异步事件流。 例如,在响应式模型下,对数据库的读取调用不会在获取数据时阻塞调用线程。调用会立即返回一个发布者(Publisher),其他人可以订阅该发布者。订阅者(Subscriber)可以在事件发生后对其进行处理,甚至可以自己进一步生成事件: 最重要的是,响应式编程并不强调应该生成和消耗哪个线程事件。相反,它强调的是将程序构造成异步事件流。 这里的发布者和订阅者不需要属于同一个线程。这有助于我们更好地利用可用线程,从而提高整体并发性。 4、Event Loop Event Loop(事件循环)模型是一种用于服务器的响应式异步编程模型: 上图是一个事件循环的抽象设计,展示了响应式异步编程的思想: Event Loop 在单个线程中连续运行,我们可以根据可用内核的数量设置多个 Event Loop。 Event Loop 按顺序处理来自事件队列的事件,并在向平台注册回调后立即返回。 Platform(平台)可以触发操作的完成,如数据库调用或外部服务调用。 Event Loop 可在 operation(操作)完成通知时触发 callback(回调),并将结果发回给原始调用者。 包括 Node.js、Netty 和 Ngnix 在内的许多平台都实现了 Event Loop 模型。与 Apache HTTP 服务器、Tomcat 或 JBoss 等传统平台相比,它们具有更好的可扩展性。 5、用 Spring WebFlux 进行响应式编程 对响应式编程及其并发模型有了足够的了解后,来看看 Spring WebFlux。它是 Spring 在 5.

Spring Webflux 教程

1、概览 Spring 5 引入了 Spring WebFlux 框架,为 Web 应用提供响应式编程支持。 本文将带你了解如何使用响应式 Web 组件 RestController 和 WebClient 创建一个小型响应式 REST 应用,以及如何使用 Spring Security 来保护响应式端点。 2、Spring WebFlux 框架 Spring WebFlux 内部使用 Project Reactor 及其 Publisher(发布者)实现、Flux 和 Mono。 WebFlux 支持两种编程模式: 基于注解的响应式组件 函数式路由和处理 本文重点介绍基于注解的响应式组件。 3、依赖 首先从 spring-boot-starter-webflux 依赖开始。 它会传递依赖其他所有的依赖: spring-boot 和 spring-boot-starter,用于基本的 Spring Boot 应用设置 spring-webflux 框架 响应式流(Reactive Stream)所需的 reactor-core 以及 reactor-netty <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <version>3.1.2</version> </dependency> 可从 Maven Central 下载最新的 spring-boot-starter-webflux。 4、响应式 REST 应用 现在,使用 Spring WebFlux 构建一个非常简单的响应式 REST EmployeeManagement 应用:

WebClient 超时设置

1、概览 WebClient 是一个响应式的 HTTP 客户端,它基于 Reactor 项目提供了函数式 API。 本文将带你了解 WebClient 的超时设置,学习如何正确地设置不同的超时,既包括整个应用程序的全局超时,也包括特定请求的超时。 2、WebClient 和 HTTP 客户端 WebClient 还需要一个 HTTP 客户端库才能正常工作。Spring 为其中一些提供了 内置支持,默认使用的是 Reactor Netty。 大多数配置,包括超时,都可以通过这些客户端完成。 3、通过 HTTP 客户端配置超时 如前所述,在应用中设置不同 WebClient 超时的最简单方法是通过底层 HTTP 客户端进行全局设置。这也是最有效的方法。 由于 Netty 是 Spring WebFlux 的默认客户端库,本文将以 Reactor Netty HttpClient 类 作为示例。 3.1、响应超时 响应超时是指发送请求后等待接收响应的时间。可以使用 responseTimeout() 方法为客户端配置它: HttpClient client = HttpClient.create() .responseTimeout(Duration.ofSeconds(1)); 在本例中,配置的超时时间为 1 秒。Netty 默认不设置响应超时。 然后,就可以使用 HttpClient 来构建 Spring WebClient。 WebClient webClient = WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .build(); 之后,WebClient 会继承底层 HttpClient 提供的所有配置,适用于发送的所有请求。

Spring WebFlux 中的重试

1、概览 在云上构建分布式应用时,需要考虑到服务故障,这通常会涉及到重试。 Spring WebFlux 提供了一些失败后重试的工具。 本文将会带你了解如何在 Spring WebFlux 添加和配置重试功能。 2、用例 本文使用一个 MockWebServer 来模拟外部系统暂时不可用,然后又变为可用的情况。 连接到 REST 服务的组件: @Test void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() { mockExternalService.enqueue(new MockResponse() .setResponseCode(SERVICE_UNAVAILABLE.code())); mockExternalService.enqueue(new MockResponse() .setResponseCode(SERVICE_UNAVAILABLE.code())); mockExternalService.enqueue(new MockResponse() .setResponseCode(SERVICE_UNAVAILABLE.code())); mockExternalService.enqueue(new MockResponse() .setBody("stock data")); StepVerifier.create(externalConnector.getData("ABC")) .expectNextMatches(response -> response.equals("stock data")) .verifyComplete(); verifyNumberOfGetRequests(4); } 3、添加重试 Mono 和 Flux API 中内置了两个关键的 retry 操作。 3.1、使用 retry retry 可以防止应用立即返回错误,并重新订阅指定次数: public Mono<String> getData(String stockId) { return webClient.get() .uri(PATH_BY_ID, stockId) .retrieve() .bodyToMono(String.class) .retry(3); } 无论 Web 客户端返回什么错误,都会重试最多三次。 3.2、使用 retryWhen retryWhen 方法可用来创建一个可配置的重试策略:

在 Spring Webflux 中使用 @Cacheable 注解缓存结果

1、概览 本文将带你了解如何在 Spring WebFlux 中使用 @Cacheable 注解实现缓存,以及一些常见的问题和解决办法。 2、@Cacheable 和响应式类型 在本文撰稿时,@Cacheable 还不能和响应式框架无缝整合。主要问题在于,目前还没有非阻塞式的缓存实现(JSR-107 缓存 API 是阻塞式的)。只有 Redis 提供了响应式驱动。 虽然,仍然可以在方法上使用 @Cacheable。这会缓存封装对象(Mono 或 Flux),但不会缓存方法的实际结果。 2.1、项目设置 创建一个使用响应式 MongoDB 驱动的 Spring WebFlux 项目。并且用 Testcontainers 代替真实运行的 MongoDB 进行测试。 测试类使用 @SpringBootTest 进行注解,如下: final static MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10")); @DynamicPropertySource static void mongoDbProperties(DynamicPropertyRegistry registry) { mongoDBContainer.start(); registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); } 这几行代码会启动 MongoDB 实例,并将 URI 传递给 Spring Boot 以自动配置 Mongo Repository。 创建带有保存和获取 Item 方法的 ItemService 类: @Service public class ItemService { private final ItemRepository repository; public ItemService(ItemRepository repository) { this.

使用 WebFlux 开发一个 CURD 应用

WebFlux 最为人所诟病的是数据库的支持问题,毕竟数据是一个应用的生命,我们接触的大部分应用程序都是有数据库的,而 WebFlux 在这一方面的支持行一直比较弱,这也是大家总是吐槽它的原因。 不过从 Spring 5 开始,这一问题得到了一定程度的缓解。 Spring 官方在 Spring5 发布了响应式 Web 框架 Spring WebFlux 之后急需能够满足异步响应的数据库交互 API,不过由于缺乏标准和驱动,Pivotal 团队开始自己研究响应式关系型数据库连接 Reactive Relational Database Connectivity,并提出了 R2DBC 规范 API 用来评估可行性并讨论数据库厂商是否有兴趣支持响应式的异步非阻塞驱动程序。最早只有 PostgreSQL 、H2、MSSQL 三家数据库厂商,不过现在 MySQL 也加入进来了,这是一个极大的利好。目前 R2DBC 的最新版本是 0.9.0.RELEASE。 本系列接下来的文章中将会和大家演示 R2DBC 的用法,但是今天我们还是先来看看 WebFlux + MongoDB 的用法,毕竟这是 WebFlux 较早支持的数据库之一,各种 API 都比较成熟,我们一步一步来。 1、项目创建 方便起见,我们这里就直接创建 Spring Boot 项目,首先创建一个 Spring Boot 项目,引入 MongoDB 依赖和 WebFlux 依赖,如下: 注意我们这里选择的 MongoDB 依赖是 Spring Data Reactive MongoDB,千万别选错了。 项目创建完成后,我们先在 application.properties 中对 MongoDB 进行简单配置,如下:

解决 Spring WebFlux DataBufferLimitException

1、简介 本文将将会介绍在 Spring WebFlux 应用中出现 DataBufferLimitException 的原因,以及解决办法。 2、原因 在讲解决方案前,先解释一下这个异常的原因。 2.1、DataBufferLimitException 是啥? Spring WebFlux 限制 了编解码器(codec)中 Buffer 的内存大小,以避免应用出现内存问题。默认情况下,配置为 262,144 字节。如果超出限制,就会导致 DataBufferLimitException 异常。 2.2、编解码器是啥? spring-web 和 spring-core 模块通过非阻塞 I/O 和 Reactive Stream 背压提供了将字节内容序列化和反序列化为更高级对象的支持。编解码器 提供了一种替代 Java 序列化的方法。其中一个 优点 是,通常对象不需要实现 Serializable 接口。 3、服务端 先看服务端的 DataBufferLimitException 是如何发生的。 3.1、问题复现 尝试向 Spring WebFlux 服务器发送大小为 390 KB 的 JSON 请求体以触发异常。 使用 curl 命令向服务器发送 POST 请求: curl --location --request POST 'http://localhost:8080/1.0/process' \ --header 'Content-Type: application/json' \ --data-binary '@/tmp/390KB.json' 你可以看到,抛出了 DataBufferLimitException 异常: