使用 WebClient 执行同步请求
1、简介
本文将会带你了解如何使用 WebClient
执行同步请求。
在响应式编程日益普及的同时,在哪些情况下这种阻塞式请求仍然是适当和必要的?
2、Spring 中的 HTTP 客户端库概述
首先,回顾一下目前可用的客户端库。
在 Spring Framework 3.0 中引入 RestTemplate
时,其简单的 HTTP 请求模板方法 API 广受欢迎。然而,其同步性质和许多重载方法在高流量应用程序中导致了复杂性和性能瓶颈。
在 Spring 5.0 中,WebClient
被引入作为一种更高效、响应式的选择,用于非阻塞请求。尽管它是 Reactive Stack Web 框架的一部分,但它支持用于同步和异步通信的 Fluent 风格的 API。
在 Spring Framework 6.1 中,RestClient
提供了另一种执行 REST 调用的选项。它将 WebClient
的 Fluent API 与 RestTemplate
的基础设施结合在一起,包括消息转换器、请求工厂和拦截器。
尽管 RestClient
针对同步请求进行了优化,但如果我们的应用还需要异步或流式传输功能,则 WebClient
更为适用。通过在阻塞和非阻塞 API 调用中使用 WebClient
,我们可以保持代码库的一致性,避免混用不同的客户端库。
3、阻塞与非阻塞 API 调用
在介绍各种 HTTP 客户端时,我们使用了同步和异步、阻塞和非阻塞等术语。这些术语与上下文有关,有时可能代表同一概念的不同名称。
在方法调用方面,WebClient
根据发送和接收 HTTP 请求和响应的方式,支持同步和异步交互。如果 WebClient
等待前一个请求完成后才继续处理后一个请求,则是以阻塞方式进行的,而结果是同步返回的。
另一方面,我们可以通过执行立即返回的非阻塞调用来实现异步交互。在等待另一个系统的响应时,其他处理工作可以继续进行,一旦准备就绪,就会以异步方式提供结果。
4、什么情况下使用同步请求
如前所述,WebClient
是 Spring Webflux 框架的一部分,默认情况下一切都是响应式的。不过,该库提供异步和同步操作支持,因此适用于响应式和 Servlet Stack Web 应用。
在测试或原型设计等需要即时反馈的情况下,以阻塞方式使用 WebClient
是合适的。这种方法允许我们在考虑性能优化之前先关注功能。
许多现有应用程序仍在使用像 RestTemplate
这样的阻塞客户端。由于 RestTemplate
从 Spring 5.0 开始进入维护模式,重构遗留代码库将需要更新依赖关系,并有可能过渡到非阻塞架构。在这种情况下,我们可以暂时以阻塞方式使用 WebClient
。
即使在新项目中,某些应用部分也可以设计为同步工作流。这可能包括这样的场景,如对各种外部系统的顺序 API 调用,其中一次调用的结果是进行下一次调用所必需的。WebClient
可以处理阻塞和非阻塞调用,而不需要使用不同的客户端。
稍后我们将看到,同步执行和异步执行之间的切换相对简单。只要有可能,我们就应该避免使用阻塞调用,尤其是在使用 Reactive Stack 时。
5、WebClient 同步调用 API
在发送 HTTP 请求时,WebClient
会从 Reactor Core 库中返回 Mono
或 Flux
两种响应式数据类型之一。这些返回类型代表数据流,其中 Mono
对应单个值或空结果,而 Flux
则指零或多个值的数据流。异步和非阻塞 API 可让调用者决定何时以及如何订阅,从而保持代码的响应性。
不过,如果我们想模拟同步行为,可以调用 block()
方法。它会阻塞当前操作以获取结果。
更准确地说,block()
方法会触发对响应流的新订阅,启动从源到消费者的数据流。在内部,它会使用 CountDownLatch
等待流完成,这将暂停当前线程,直到操作完成,即 Mono
或 Flux
发出结果。block()
方法将非阻塞操作转换为传统的阻塞操作,使调用线程等待结果。
6、实例
来看一个实际的例子。假设我们有一个简单的 API 网关应用,位于客户端应用和两个后端应用( Customer 和 Billing 系统)之间。第一个应用保存客户信息,而第二个提供计费(账单)细节。不同的客户端通过 API 网关进行交互,如下是向客户端公开的接口,用于检索客户信息,包括他们的账单细节:
@GetMapping("/{id}")
CustomerInfo getCustomerInfo(@PathVariable("id") Long customerId) {
return customerInfoService.getCustomerInfo(customerId);
}
下面是模型类:
public class CustomerInfo {
private Long customerId;
private String customerName;
private Double balance;
// Getter / Setter 省略
}
API 网关通过为内部与 Customer 和 Billing 应用通信提供单个端点来简化流程。然后,它会聚合来自两个系统的数据。
考虑到我们在整个系统中使用了同步 API 的情况。然而,我们最近升级了我们的 Customer 和 Billing 系统,以处理异步和非阻塞操作。
Customer API:
@GetMapping("/{id}")
Mono<Customer> getCustomer(@PathVariable("id") Long customerId) throws InterruptedException {
TimeUnit.SECONDS.sleep(SLEEP_DURATION.getSeconds());
return Mono.just(customerService.getBy(customerId));
}
Billing API:
@GetMapping("/{id}")
Mono<Billing> getBilling(@PathVariable("id") Long customerId) throws InterruptedException {
TimeUnit.SECONDS.sleep(SLEEP_DURATION.getSeconds());
return Mono.just(billingService.getBy(customerId));
}
在实际场景中,这些 API 会成为单独组件的一部分。然而,为了简化起见,我们将它们组织到代码中的不同包中。另外,为了进行测试,我们引入了延迟以模拟网络延迟:
public static final Duration SLEEP_DURATION = Duration.ofSeconds(2);
与两个后端系统不同,我们的 API Gateway 应用必须公开同步、阻塞 API,以避免破坏客户端约定。因此,这里没有任何变化。
业务逻辑位于 CustomerInfoService
中。首先,我们使用 WebClient
从 Customer 系统中检索数据:
Customer customer = webClient.get()
.uri(uriBuilder -> uriBuilder.path(CustomerController.PATH_CUSTOMER)
.pathSegment(String.valueOf(customerId))
.build())
.retrieve()
.onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
.map(ApiGatewayException::new))
.bodyToMono(Customer.class)
.block();
接下来是 Billing 系统:
Billing billing = webClient.get()
.uri(uriBuilder -> uriBuilder.path(BillingController.PATH_BILLING)
.pathSegment(String.valueOf(customerId))
.build())
.retrieve()
.onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
.map(ApiGatewayException::new))
.bodyToMono(Billing.class)
.block();
最后,用两个组件的响应,构建成一个响应:
new CustomerInfo(customer.getId(), customer.getName(), billing.getBalance());
如果其中一个 API 调用失败,onStatus()
方法内定义的错误处理将把 HTTP 错误状态映射到 ApiGatewayException
。在这里,我们使用的是传统方法,而不是通过 Mono.error()
方法采用响应式的替代方案。于我们的客户期望使用同步 API,因此我们抛出的异常会传播给调用者。
尽管 Customer 和 Billing 系统具有异步性质,但 WebClient
的 block()
方法使我们能够汇总这两个来源的数据,并以透明方式向客户返回综合结果。
6.1、优化多个 API 调用
此外,由于我们要连续调用不同系统两次,我们可以通过避免单独阻塞每个响应来优化这个过程。我们可以执行以下操作:
private CustomerInfo getCustomerInfoBlockCombined(Long customerId) {
Mono<Customer> customerMono = webClient.get()
.uri(uriBuilder -> uriBuilder.path(CustomerController.PATH_CUSTOMER)
.pathSegment(String.valueOf(customerId))
.build())
.retrieve()
.onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
.map(ApiGatewayException::new))
.bodyToMono(Customer.class);
Mono<Billing> billingMono = webClient.get()
.uri(uriBuilder -> uriBuilder.path(BillingController.PATH_BILLING)
.pathSegment(String.valueOf(customerId))
.build())
.retrieve()
.onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
.map(ApiGatewayException::new))
.bodyToMono(Billing.class);
return Mono.zip(customerMono, billingMono, (customer, billing) -> new CustomerInfo(customer.getId(), customer.getName(), billing.getBalance()))
.block();
}
zip()
是一个将多个 Mono
实例合并为一个 Mono
的方法。当所有给定的 Mono
实例都生成了值,然后根据指定函数(在本例中是创建 CustomerInfo
对象)进行聚合,一个新的 Mono
实例就完成了。这种方法效率更高,因为它允许我们同时等待两个服务的合并结果。
为了验证我们是否提高了性能,让我们在两种情况下进行测试:
@Autowired
private WebTestClient testClient;
@Test
void givenApiGatewayClient_whenBlockingCall_thenResponseReceivedWithinDefinedTimeout() {
Long customerId = 10L;
assertTimeout(Duration.ofSeconds(CustomerController.SLEEP_DURATION.getSeconds() + BillingController.SLEEP_DURATION.getSeconds()), () -> {
testClient.get()
.uri(uriBuilder -> uriBuilder.path(ApiGatewayController.PATH_CUSTOMER_INFO)
.pathSegment(String.valueOf(customerId))
.build())
.exchange()
.expectStatus()
.isOk();
});
}
最开始,测试失败了。然而,在切换到等待合并结果后,测试在 Customer 和 Billing 系统调用的组合持续时间内完成。这表明我们通过聚合来自两个服务的响应来提高了性能。即使我们使用阻塞同步方法,我们仍然可以遵循最佳实践来优化性能。这有助于确保系统保持高效和可靠。
7、总结
本文介绍了如何使用 WebClient
进行同步通信,WebClient
是为响应式编程设计的工具,但也能进行阻塞调用。
Ref:https://www.baeldung.com/java-webclient-synchronous-requests