使用 OpenFeign 和 CompletableFuture 并行处理多个 HTTP 请求

1、简介

在处理分布式系统时,调用外部服务并保持低延迟是一项至关重要的任务。

本文将带你了解如何使用 OpenFeignCompletableFuture 来并行处理多个 HTTP 请求,处理错误,并设置网络和线程超时。

2、示例项目

为了说明并行请求的用法,我们要实现一个功能,允许客户在网站上购买物品。首先,该服务发出一个请求,根据客户所在国家获取可用的付款方式。其次,它发送一个请求给客户生成有关购买的报告。购买报告不包括有关付款方式的信息。

先添加 spring-cloud-starter-openfeign 依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

3、创建调用外部依赖的客户端

使用 @FeignClient 注解创建两个指向 localhost:8083 的客户端:

@FeignClient(name = "paymentMethodClient", url = "http://localhost:8083")
public interface PaymentMethodClient {

    @RequestMapping(method = RequestMethod.GET, value = "/payment_methods")
    String getAvailablePaymentMethods(@RequestParam(name = "site_id") String siteId);
}

第一个客户端名为 paymentMethodClient。它调用 GET /payment_methods,使用代表客户所在国家/地区的 site_id 请求参数获取可用的付款方式。

第二个客户端如下:

@FeignClient(name = "reportClient", url = "http://localhost:8083")
public interface ReportClient {

    @RequestMapping(method = RequestMethod.POST, value = "/reports")
    void sendReport(@RequestBody String reportRequest);
}

我们将其命名为 reportClient,它调用 POST /reports 生成购买报告。

4、创建并行请求 Executor

根据需求,我们要依次调用两个客户端,在这种情况下,该 API 的总响应时间至少是两个请求的响应时间之和。

值得注意的是,报告中不包含付款方式的信息,因此这两个请求是独立的。因此,我们可以将任务并行化,将 API 的总响应时间缩短到与最慢请求的响应时间大致相同。

4.1、创建并行 Executor

使用 CompletableFutures 创建将两个请求并行化的服务:

@Service
public class PurchaseService {

    private final PaymentMethodClient paymentMethodClient;
    private final ReportClient reportClient;

    // 全参构造函数

    public String executePurchase(String siteId) throws ExecutionException, InterruptedException {
        CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> 
          paymentMethodClient.getAvailablePaymentMethods(siteId));
        CompletableFuture.runAsync(() -> reportClient.sendReport("Purchase Order Report"));

        return String.format("Purchase executed with payment method %s", paymentMethodsFuture.get());
    }
}

executePurchase() 方法首先发布一个并行任务,使用 supplyAsync() 获取可用的付款方式。然后,提交另一个并行任务,使用 runAsync() 生成报告。最后,使用 get() 获取支付方法结果并返回完整结果。

之所以选择 supplyAsync()runAsync() 这两个任务,是因为这两个方法的性质不同。supplyAsync() 方法会返回 GET 调用的结果。而,runAsync() 不会返回任何结果,因此更适合用于生成报告。

另一个区别是,runAsync() 在调用代码时会立即启动一个新线程,而无需线程池进行任何任务调度。相比之下,supplyAsync() 任务可能会被调度或延迟,这取决于线程池是否调度了其他任务。

使用 WireMock 进行集成测试:

@BeforeEach
public void startWireMockServer() {
    wireMockServer = new WireMockServer(8083);
    configureFor("localhost", 8083);
    wireMockServer.start();

    stubFor(post(urlEqualTo("/reports"))
      .willReturn(aResponse().withStatus(HttpStatus.OK.value())));
}

@AfterEach
public void stopWireMockServer() {
    wireMockServer.stop();
}

@Test
void givenRestCalls_whenBothReturnsOk_thenReturnCorrectResult() throws ExecutionException, InterruptedException {
    stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
      .willReturn(aResponse().withStatus(HttpStatus.OK.value()).withBody("credit_card")));

    String result = purchaseService.executePurchase("BR");

    assertNotNull(result);
    assertEquals("Purchase executed with payment method credit_card", result);
}

在上面的测试中,我们首先使用 @BeforeEach@AfterEach 注解配置了一个 WireMockServer,使其在 localhost:8083 启动,并在完成后关闭。

然后,在测试方法中,我们使用了两个 stub,当我们调用这两个 feign 客户端时,它们都会响应 200 HTTP 状态。最后,我们使用 assertEquals() 来断言并行 executor 的结果是正确的。

4.2、使用 exceptionally() 处理外部错误

如果 GET /payment_methods 请求以 404 HTTP 状态失败,表明该国家没有可用的支付方法,该怎么办?在这种情况下需要做一些处理,例如,返回一个默认值。

要在 CompletableFuture 中处理错误,需要在 paymentMethodsFuture 中添加以下 exceptionally() 代码块:

CompletableFuture <String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
  .exceptionally(ex -> {
      if (ex.getCause() instanceof FeignException && 
             ((FeignException) ex.getCause()).status() == 404) {
          return "cash";
      });

现在,如果得到 404,就会返回名为 cash(现金)的默认付款方式:

@Test
void givenRestCalls_whenPurchaseReturns404_thenReturnDefault() throws ExecutionException, InterruptedException {
    stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
        .willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND.value())));

    String result = purchaseService.executePurchase("BR");

    assertNotNull(result);
    assertEquals("Purchase executed with payment method cash", result);
}

5、为并行任务和网络请求添加超时

在调用外部依赖项时,我们无法确定请求的运行时间。因此,如果请求耗时过长,就应该放弃该请求。考虑到这一点,我们可以添加两种类型:FeignClientCompletableFuture 超时。

5.1、为 Feign 客户端添加网络超时

这种类型的超时适用于单个网络请求。

我们可以使用 Spring Boot 自动配置为 FeignClient 配置超时:

feign.client.config.paymentMethodClient.readTimeout: 200
feign.client.config.paymentMethodClient.connectTimeout: 100

在上述 application.properties 文件中,我们为 PaymentMethodClient 设置了读取和连接超时。数值以毫秒为单位。

连接超时会告诉 feign 客户端在达到阈值后中断 TCP 握手连接尝试。同样,当连接正常建立,但协议无法从 Socket 读取数据时,读取超时会中断请求。

然后,我们就可以在并行 executor 中的 exceptionally() 块内处理这类错误:

if (ex.getCause() instanceof RetryableException) {
    // 处理 TCP 超时
    throw new RuntimeException("TCP call network timeout!");
}

添加另一个测试场景,验证行为的正确性:

@Test
void givenRestCalls_whenPurchaseRequestWebTimeout_thenReturnDefault() {
    stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
      .willReturn(aResponse().withFixedDelay(250)));

    Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));

    assertEquals("java.lang.RuntimeException: REST call network timeout!", error.getMessage());
}

如上,我们使用 withFixedDelay() 方法,用 250 毫秒来模拟 TCP 超时。

5.2、添加线程超时

线程超时会停止整个 CompletableFuture,而不仅仅是一次请求尝试。例如,对于 feign 户端重试,在评估超时阈值时,原始请求和重试尝试的时间也会计算在内。

要配置线程超时,我们可以稍微修改一下我们的支付方法 CompletableFuture

CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
  .orTimeout(400, TimeUnit.MILLISECONDS)
  .exceptionally(ex -> {
       // 异常处理
   });

然后,我们就可以在 exceptionally() 代码块中处理线程超时错误:

if (ex instanceof TimeoutException) {
    // 处理线程超时异常
    throw new RuntimeException("Thread timeout!", ex);
}

测试:

@Test
void givenRestCalls_whenPurchaseCompletableFutureTimeout_thenThrowNewException() {
    stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
        .willReturn(aResponse().withFixedDelay(450)));

    Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));

    assertEquals("java.lang.RuntimeException: Thread timeout!", error.getMessage());
}

我们为 /payments_method 添加了更长的延迟,这样它就能通过网络超时阈值,但在线程超时时会失败。

6、总结

本文介绍了如何使用 CompletableFutureFeignClient 并行执行多个外部请求,还介绍了如何设置网络超时和线程超时,以及如何优雅地处理超时异常。


Ref:https://www.baeldung.com/feign-client-completablefuture-spring-boot