使用 OpenFeign 和 CompletableFuture 并行处理多个 HTTP 请求
1、简介
在处理分布式系统时,调用外部服务并保持低延迟是一项至关重要的任务。
本文将带你了解如何使用 OpenFeign
和 CompletableFuture
来并行处理多个 HTTP 请求,处理错误,并设置网络和线程超时。
2、示例项目
为了说明并行请求的用法,我们要实现一个功能,允许客户在网站上购买物品。首先,该服务发出一个请求,根据客户所在国家获取可用的付款方式。其次,它发送一个请求给客户生成有关购买的报告。购买报告不包括有关付款方式的信息。
先添加 spring-cloud-starter-openfeign 依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
3、创建调用外部依赖的客户端
使用 @FeignClient
注解创建两个指向 localhost:8083
的客户端:
@FeignClient(name = "paymentMethodClient", url = "http://localhost:8083")
public interface PaymentMethodClient {
@RequestMapping(method = RequestMethod.GET, value = "/payment_methods")
String getAvailablePaymentMethods(@RequestParam(name = "site_id") String siteId);
}
第一个客户端名为 paymentMethodClient
。它调用 GET /payment_methods
,使用代表客户所在国家/地区的 site_id
请求参数获取可用的付款方式。
第二个客户端如下:
@FeignClient(name = "reportClient", url = "http://localhost:8083")
public interface ReportClient {
@RequestMapping(method = RequestMethod.POST, value = "/reports")
void sendReport(@RequestBody String reportRequest);
}
我们将其命名为 reportClient
,它调用 POST /reports
生成购买报告。
4、创建并行请求 Executor
根据需求,我们要依次调用两个客户端,在这种情况下,该 API 的总响应时间至少是两个请求的响应时间之和。
值得注意的是,报告中不包含付款方式的信息,因此这两个请求是独立的。因此,我们可以将任务并行化,将 API 的总响应时间缩短到与最慢请求的响应时间大致相同。
4.1、创建并行 Executor
使用 CompletableFutures
创建将两个请求并行化的服务:
@Service
public class PurchaseService {
private final PaymentMethodClient paymentMethodClient;
private final ReportClient reportClient;
// 全参构造函数
public String executePurchase(String siteId) throws ExecutionException, InterruptedException {
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() ->
paymentMethodClient.getAvailablePaymentMethods(siteId));
CompletableFuture.runAsync(() -> reportClient.sendReport("Purchase Order Report"));
return String.format("Purchase executed with payment method %s", paymentMethodsFuture.get());
}
}
executePurchase()
方法首先发布一个并行任务,使用 supplyAsync()
获取可用的付款方式。然后,提交另一个并行任务,使用 runAsync()
生成报告。最后,使用 get()
获取支付方法结果并返回完整结果。
之所以选择 supplyAsync()
和 runAsync()
这两个任务,是因为这两个方法的性质不同。supplyAsync()
方法会返回 GET 调用的结果。而,runAsync()
不会返回任何结果,因此更适合用于生成报告。
另一个区别是,runAsync()
在调用代码时会立即启动一个新线程,而无需线程池进行任何任务调度。相比之下,supplyAsync()
任务可能会被调度或延迟,这取决于线程池是否调度了其他任务。
使用 WireMock
进行集成测试:
@BeforeEach
public void startWireMockServer() {
wireMockServer = new WireMockServer(8083);
configureFor("localhost", 8083);
wireMockServer.start();
stubFor(post(urlEqualTo("/reports"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value())));
}
@AfterEach
public void stopWireMockServer() {
wireMockServer.stop();
}
@Test
void givenRestCalls_whenBothReturnsOk_thenReturnCorrectResult() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value()).withBody("credit_card")));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method credit_card", result);
}
在上面的测试中,我们首先使用 @BeforeEach
和 @AfterEach
注解配置了一个 WireMockServer
,使其在 localhost:8083 启动,并在完成后关闭。
然后,在测试方法中,我们使用了两个 stub,当我们调用这两个 feign 客户端时,它们都会响应 200 HTTP 状态。最后,我们使用 assertEquals()
来断言并行 executor 的结果是正确的。
4.2、使用 exceptionally() 处理外部错误
如果 GET /payment_methods 请求以 404 HTTP 状态失败,表明该国家没有可用的支付方法,该怎么办?在这种情况下需要做一些处理,例如,返回一个默认值。
要在 CompletableFuture
中处理错误,需要在 paymentMethodsFuture
中添加以下 exceptionally()
代码块:
CompletableFuture <String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.exceptionally(ex -> {
if (ex.getCause() instanceof FeignException &&
((FeignException) ex.getCause()).status() == 404) {
return "cash";
});
现在,如果得到 404,就会返回名为 cash
(现金)的默认付款方式:
@Test
void givenRestCalls_whenPurchaseReturns404_thenReturnDefault() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND.value())));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method cash", result);
}
5、为并行任务和网络请求添加超时
在调用外部依赖项时,我们无法确定请求的运行时间。因此,如果请求耗时过长,就应该放弃该请求。考虑到这一点,我们可以添加两种类型:FeignClient
和 CompletableFuture
超时。
5.1、为 Feign 客户端添加网络超时
这种类型的超时适用于单个网络请求。
我们可以使用 Spring Boot 自动配置为 FeignClient
配置超时:
feign.client.config.paymentMethodClient.readTimeout: 200
feign.client.config.paymentMethodClient.connectTimeout: 100
在上述 application.properties
文件中,我们为 PaymentMethodClient
设置了读取和连接超时。数值以毫秒为单位。
连接超时会告诉 feign 客户端在达到阈值后中断 TCP 握手连接尝试。同样,当连接正常建立,但协议无法从 Socket 读取数据时,读取超时会中断请求。
然后,我们就可以在并行 executor 中的 exceptionally()
块内处理这类错误:
if (ex.getCause() instanceof RetryableException) {
// 处理 TCP 超时
throw new RuntimeException("TCP call network timeout!");
}
添加另一个测试场景,验证行为的正确性:
@Test
void givenRestCalls_whenPurchaseRequestWebTimeout_thenReturnDefault() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(250)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: REST call network timeout!", error.getMessage());
}
如上,我们使用 withFixedDelay()
方法,用 250 毫秒来模拟 TCP 超时。
5.2、添加线程超时
线程超时会停止整个 CompletableFuture
,而不仅仅是一次请求尝试。例如,对于 feign 户端重试,在评估超时阈值时,原始请求和重试尝试的时间也会计算在内。
要配置线程超时,我们可以稍微修改一下我们的支付方法 CompletableFuture
:
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.orTimeout(400, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
// 异常处理
});
然后,我们就可以在 exceptionally()
代码块中处理线程超时错误:
if (ex instanceof TimeoutException) {
// 处理线程超时异常
throw new RuntimeException("Thread timeout!", ex);
}
测试:
@Test
void givenRestCalls_whenPurchaseCompletableFutureTimeout_thenThrowNewException() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(450)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: Thread timeout!", error.getMessage());
}
我们为 /payments_method
添加了更长的延迟,这样它就能通过网络超时阈值,但在线程超时时会失败。
6、总结
本文介绍了如何使用 CompletableFuture
和 FeignClient
并行执行多个外部请求,还介绍了如何设置网络超时和线程超时,以及如何优雅地处理超时异常。
Ref:https://www.baeldung.com/feign-client-completablefuture-spring-boot