在 Spring Boot 应用中使用 Resilience4j

1、概览

Resilience4j 是一个轻量级的容错库,提供了诸如熔断、重试、限流、超时、隔板等功能,可以帮助应用程序在面对故障和不稳定条件时保持可用性和可靠性。

在本教程中,我们将学习如何在 Spring Boot 应用程序中使用 Resilience4j。

2、项目设置

2.1、Maven 依赖

首先,我们需要添加 spring-boot-starter-web starter 来创建一个简单的 web 应用:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

接下来,我们需要 resilience4j-spring-boot2spring-boot-starter-aop 依赖,以便在 Spring Boot 中通过注解使用 Resilience-4j 库的功能:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

此外,我们还需要添加 spring-boot-starter-actuator 依赖,以便通过暴露一组端点来监控应用的当前状态:

<dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

最后,还要添加 wiremock-jre8 依赖,用于帮助我们使用 mock HTTP server 测试 REST API:

<dependency>
    <groupId>com.github.tomakehurst</groupId>
    <artifactId>wiremock-jre8</artifactId>
    <scope>test</scope>
</dependency>

2.2、RestController 和外部 API 调用

在使用 Resilience4j 库的不同功能时,我们的 web 应用需要与外部 API 进行交互。因此,我们需要添加一个 RestTemplate bean,用于 API 调用:

@Bean
public RestTemplate restTemplate() {
    return new RestTemplateBuilder().rootUri("http://localhost:9090")
      .build();
}

然后,我们定义 ExternalAPICaller 组件,并注入 restTemplate Bean:

@Component
public class ExternalAPICaller {
    private final RestTemplate restTemplate;

    @Autowired
    public ExternalAPICaller(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }
}

接下来,我们将定义 ResilientAppController 类,该类将暴露 REST API 端点,并在内部使用 ExternalAPICaller Bean 调用外部 API:

@RestController
@RequestMapping("/api/")
public class ResilientAppController {
    private final ExternalAPICaller externalAPICaller;
}

2.3、Actuator 端点

我们可以通过 Spring Boot Actuator 暴露 health 端点,以便随时了解应用的确切状态。

application.properties 文件中添加配置,并启用端点:

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

management.health.circuitbreakers.enabled=true
management.health.ratelimiters.enabled=true

我们还会在此 application.properties 文件中添加额外的配置。

2.4、单元测试

在实际场景中,我们的 Web 应用将调用一个外部服务。然而,我们可以通过使用 WireMockExtension 类启动一个模拟的正在运行的外部服务。

EXTERNAL_SERVICE 定义为 ResilientAppControllerUnitTest 类中的静态成员:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ResilientAppControllerUnitTest {

    @RegisterExtension
    static WireMockExtension EXTERNAL_SERVICE = WireMockExtension.newInstance()
      .options(WireMockConfiguration.wireMockConfig()
      .port(9090))
      .build();
}

然后,添加一个 TestRestTemplate 实例来调用 API:

@Autowired
private TestRestTemplate restTemplate;

2.5、异常处理

Resilience4j 库会根据 context 中的容错模式抛出异常,从而保护服务资源。不过,这些异常应转化为对客户端有意义 HTTP 响应码。

定义 ApiExceptionHandler 类来处理不同的异常:

@ControllerAdvice
public class ApiExceptionHandler {
}

在探索不同容错模式的过程中,我们将在该类中添加对应的 exception handler。

3、熔断 - Circuit breaker

熔断器(Circuit Breaker,也有叫“断路器”的)模式通过限制上游服务在部分或完全停机期间调用下游服务,从而保护下游服务。

首先,让我们暴露 /api/circuit-breaker 端点并添加 @CircuitBreaker 注解:

@GetMapping("/circuit-breaker")
@CircuitBreaker(name = "CircuitBreakerService")
public String circuitBreakerApi() {
    return externalAPICaller.callApi();
}

我们还需要在 ExternalAPICaller 类中实现 callApi() 方法,用于调用外部端点 /api/external

public String callApi() {
    return restTemplate.getForObject("/api/external", String.class);
}

接下来,我们将在 application.properties 文件中添加熔断器的配置:

resilience4j.circuitbreaker.instances.CircuitBreakerService.failure-rate-threshold=50
resilience4j.circuitbreaker.instances.CircuitBreakerService.minimum-number-of-calls=5
resilience4j.circuitbreaker.instances.CircuitBreakerService.automatic-transition-from-open-to-half-open-enabled=true
resilience4j.circuitbreaker.instances.CircuitBreakerService.wait-duration-in-open-state=5s
resilience4j.circuitbreaker.instances.CircuitBreakerService.permitted-number-of-calls-in-half-open-state=3
resilience4j.circuitbreaker.instances.CircuitBreakerService.sliding-window-size=10
resilience4j.circuitbreaker.instances.CircuitBreakerService.sliding-window-type=count_based

该配置将允许在熔断器关闭状态(closed)下对服务进行 50% 的失败调用,之后它将打开熔断器(open),并开始使用 CallNotPermittedException 拒绝请求。因此,最好在 ApiExceptionHandler 类中为该异常添加一个 exception handler:

@ExceptionHandler({CallNotPermittedException.class})
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
public void handleCallNotPermittedException() {
}

最后,我们将使用 EXTERNAL_SERVICE 模拟下游服务宕机的情况,从而测试 /api/circuit-breaker API 端点:

@Test
public void testCircuitBreaker() {
    EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
      .willReturn(serverError()));

    IntStream.rangeClosed(1, 5)
      .forEach(i -> {
          ResponseEntity response = restTemplate.getForEntity("/api/circuit-breaker", String.class);
          assertThat(response.getStatusCode()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR);
      });

    IntStream.rangeClosed(1, 5)
      .forEach(i -> {
          ResponseEntity response = restTemplate.getForEntity("/api/circuit-breaker", String.class);
          assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
      });
    
    EXTERNAL_SERVICE.verify(5, getRequestedFor(urlEqualTo("/api/external")));
}

我们可以看到,前五次调用都失败了,因为下游服务停机了。之后,熔断器切换到打开(open)状态,随后的五次尝试都以 503 HTTP 状态代码被拒绝,但并未实际调用底层 API。

4、重试 - Retry

重试模式通过从短暂的问题中恢复,为系统提供了弹性。让我们首先创建 /api/retry API 端点,添加@Retry 注解。

@GetMapping("/retry")
@Retry(name = "retryApi", fallbackMethod = "fallbackAfterRetry")
public String retryApi() {
    return externalAPICaller.callApi();
}

我们可以选择在所有重试尝试都失败时提供一个 fallback 机制。这里,我们提供 fallbackAfterRetry 作为 fallback 方法:

public String fallbackAfterRetry(Exception ex) {
    return "all retries have exhausted";
}

接下来,我们将更新 application.properties 文件,添加控制重试行为的配置:

resilience4j.retry.instances.retryApi.max-attempts=3
resilience4j.retry.instances.retryApi.wait-duration=1s
resilience4j.retry.metrics.legacy.enabled=true
resilience4j.retry.metrics.enabled=true

如上所示,我们计划最多重试三次,每次延迟 1s

最后,我们来测试 /api/retry API 端点的重试行为:

@Test
public void testRetry() {
    EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
      .willReturn(ok()));
    ResponseEntity<String> response1 = restTemplate.getForEntity("/api/retry", String.class);
    EXTERNAL_SERVICE.verify(1, getRequestedFor(urlEqualTo("/api/external")));

    EXTERNAL_SERVICE.resetRequests();

    EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
      .willReturn(serverError()));
    ResponseEntity<String> response2 = restTemplate.getForEntity("/api/retry", String.class);
    Assert.assertEquals(response2.getBody(), "all retries have exhausted");
    EXTERNAL_SERVICE.verify(3, getRequestedFor(urlEqualTo("/api/external")));
}

我们可以看到,在第一种情况下,没有出现任何问题,因此一次尝试就行。然而,当出现问题时,需要进行三次尝试,之后 API 会通过 fallback 机制做出响应。

5、超时 - Time Limiter

我们可以使用超时模式为对外部系统的异步调用设置超时阈值。

添加 /api/time-limiter API 端点,添加 @TimeLimiter 注解。在内部调用“延迟” API:

@GetMapping("/time-limiter")
@TimeLimiter(name = "timeLimiterApi")
public CompletableFuture<String> timeLimiterApi() {
    return CompletableFuture.supplyAsync(externalAPICaller::callApiWithDelay);
}

然后,我们将通过在 callApiWithDelay() 方法中添加 sleep 时间来模拟外部 API 调用的延迟:

public String callApiWithDelay() {
    String result = restTemplate.getForObject("/api/external", String.class);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException ignore) {
    }
    return result;
}

接下来,我们需要在 application.properties 文件中提供 timeLimiterApi 的配置:

resilience4j.timelimiter.metrics.enabled=true
resilience4j.timelimiter.instances.timeLimiterApi.timeout-duration=2s
resilience4j.timelimiter.instances.timeLimiterApi.cancel-running-future=true

我们可以看到,阈值被设置为 2s。之后,Resilience4j 库内部会以超时异常(TimeoutException)取消异步操作。因此,我们要在 ApiExceptionHandler 类中为该异常添加一个 exception handler,以返回 408 HTTP 状态代码的 API 响应:

@ExceptionHandler({TimeoutException.class})
@ResponseStatus(HttpStatus.REQUEST_TIMEOUT)
public void handleTimeoutException() {
}

最后,我们测试 /api/time-limiter API 端点配置的超时模式:

@Test
public void testTimeLimiter() {
    EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external").willReturn(ok()));
    ResponseEntity<String> response = restTemplate.getForEntity("/api/time-limiter", String.class);

    assertThat(response.getStatusCode()).isEqualTo(HttpStatus.REQUEST_TIMEOUT);
    EXTERNAL_SERVICE.verify(1, getRequestedFor(urlEqualTo("/api/external")));
}

不出所料,由于下游 API 调用被设置为需要 5 秒以上才能完成,我们看到 API 调用出现了超时。

6、隔板 - Bulkhead

隔板(bulkhead)模式限制了外部服务的最大并发调用次数。

让我们先添加带有 @Bulkhead 注解的 /api/bulkhead API 端点:

@GetMapping("/bulkhead")
@Bulkhead(name="bulkheadApi")
public String bulkheadApi() {
    return externalAPICaller.callApi();
}

接下来,我们要在 application.properties 文件中定义控制隔板功能的配置:

resilience4j.bulkhead.metrics.enabled=true
resilience4j.bulkhead.instances.bulkheadApi.max-concurrent-calls=3
resilience4j.bulkhead.instances.bulkheadApi.max-wait-duration=1

如上,我们希望将并发调用的最大次数限制为 3 次,这样,如果隔板已满,每个线程只能等待 1 毫秒。之后,请求将被 BulkheadFullException 异常拒绝。我们还要向客户端返回一个有意义的 HTTP 状态代码,因此我们添加一个 exception handler:

@ExceptionHandler({ BulkheadFullException.class })
@ResponseStatus(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)
public void handleBulkheadFullException() {
}

最后,我们将通过并行调用五个请求来测试隔板行为:

@Test
void testBulkheadEvents() throws Exception {
  EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
      .willReturn(ok()));
  Map<Integer, Integer> responseStatusCount = new ConcurrentHashMap<>();
  ExecutorService executorService = Executors.newFixedThreadPool(5);
  CountDownLatch latch = new CountDownLatch(5);

  IntStream.rangeClosed(1, 5)
      .forEach(i -> executorService.execute(() -> {
          ResponseEntity response = restTemplate.getForEntity("/api/bulkhead", String.class);
          int statusCode = response.getStatusCodeValue();
          responseStatusCount.merge(statusCode, 1, Integer::sum);
          latch.countDown();
      }));
  latch.await();
  executorService.shutdown();

  assertEquals(2, responseStatusCount.keySet().size());
  LOGGER.info("Response statuses: " + responseStatusCount.keySet());
  assertTrue(responseStatusCount.containsKey(BANDWIDTH_LIMIT_EXCEEDED.value()));
  assertTrue(responseStatusCount.containsKey(OK.value()));
  EXTERNAL_SERVICE.verify(3, getRequestedFor(urlEqualTo("/api/external")));
}

我们可以看到,只有三个请求成功,而其他请求则被拒绝,并显示了 BANDWIDTH_LIMIT_EXCEEDED HTTP 状态代码。

7、限速 - Rate Limiter

限速器模式限制对资源的请求速率。

首先,让我们添加带有 @RateLimiter 注解的 /api/rate-limiter API 端点:

@GetMapping("/rate-limiter")
@RateLimiter(name = "rateLimiterApi")
public String rateLimitApi() {
    return externalAPICaller.callApi();
}

接下来,在 application.properties 文件中定义限速器的配置:

resilience4j.ratelimiter.metrics.enabled=true
resilience4j.ratelimiter.instances.rateLimiterApi.register-health-indicator=true
resilience4j.ratelimiter.instances.rateLimiterApi.limit-for-period=5
resilience4j.ratelimiter.instances.rateLimiterApi.limit-refresh-period=60s
resilience4j.ratelimiter.instances.rateLimiterApi.timeout-duration=0s
resilience4j.ratelimiter.instances.rateLimiterApi.allow-health-indicator-to-fail=true
resilience4j.ratelimiter.instances.rateLimiterApi.subscribe-for-events=true
resilience4j.ratelimiter.instances.rateLimiterApi.event-consumer-buffer-size=50

通过此配置,我们希望将 API 调用速率限制为 5 个请求/分钟,且无需等待。达到允许速率的阈值后,请求将被 RequestNotPermitted 异常拒绝。因此,我们要在 ApiExceptionHandler 类中定义一个 exception handler,用于将其转换为有意义的 HTTP 状态响应代码:

@ExceptionHandler({ RequestNotPermitted.class })
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public void handleRequestNotPermitted() {
}

最后,我们将使用 50 个请求测试我们的速率受限 API 端点:

@Test
public void testRatelimiter() {
    EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
      .willReturn(ok()));
    Map<Integer, Integer> responseStatusCount = new ConcurrentHashMap<>();

    IntStream.rangeClosed(1, 50)
      .parallel()
      .forEach(i -> {
          ResponseEntity<String> response = restTemplate.getForEntity("/api/rate-limiter", String.class);
          int statusCode = response.getStatusCodeValue();
          responseStatusCount.put(statusCode, responseStatusCount.getOrDefault(statusCode, 0) + 1);
      });

    assertEquals(2, responseStatusCount.keySet().size());
    assertTrue(responseStatusCount.containsKey(TOO_MANY_REQUESTS.value()));
    assertTrue(responseStatusCount.containsKey(OK.value()));
    EXTERNAL_SERVICE.verify(5, getRequestedFor(urlEqualTo("/api/external")));
}

不出所料,只有五个请求成功,而其他所有请求都以 TOO_MANY_REQUESTS HTTP 状态码失败。

8、Actuator 端点

我们配置了 actuator 端点,用于监控应用。通过这些端点,我们可以确定应用程序在各种容错模式下随着时间推移的变化情况。

首先,可以通过 GET 请求 /actuator 端点,获取所有暴露的端点:

http://localhost:8080/actuator/
{
    "_links" : {
        "self" : {...},
        "bulkheads" : {...},
        "circuitbreakers" : {...},
        "ratelimiters" : {...},
        ...
    }
}

我们可以看到一个 JSON 响应,其中包含 bulkheadscircuitbreakersratelimiters 等字段。提供了与容错模式相关的信息。

接下来,我们来看看与重试(retry)相关的字段:

"retries": {
  "href": "http://localhost:8080/actuator/retries",
  "templated": false
},
"retryevents": {
  "href": "http://localhost:8080/actuator/retryevents",
  "templated": false
},
"retryevents-name": {
  "href": "http://localhost:8080/actuator/retryevents/{name}",
  "templated": true
},
"retryevents-name-eventType": {
  "href": "http://localhost:8080/actuator/retryevents/{name}/{eventType}",
  "templated": true
}

接下来,我们可以检查应用,查看配置了“重试”的实例列表:

http://localhost:8080/actuator/retries
{
    "retries" : [ "retryApi" ]
}

如上,我们可以在 retries 列表中看到配置的 retryApi 实例。

最后,我们将通过浏览器向 /api/retry API 端点发起 GET 请求,并通过 /actuator/retryevents 端点观察重试事件:

{
    "retryEvents": [
    {
        "retryName": "retryApi",
        "type": "RETRY",
        "creationTime": "2022-10-16T10:46:31.950822+05:30[Asia/Kolkata]",
        "errorMessage": "...",
        "numberOfAttempts": 1
    },
    {
        "retryName": "retryApi",
        "type": "RETRY",
        "creationTime": "2022-10-16T10:46:32.965661+05:30[Asia/Kolkata]",
        "errorMessage": "...",
        "numberOfAttempts": 2
    },
    {
        "retryName": "retryApi",
        "type": "ERROR",
        "creationTime": "2022-10-16T10:46:33.978801+05:30[Asia/Kolkata]",
        "errorMessage": "...",
        "numberOfAttempts": 3
    }
  ]
}

由于下游服务宕机,我们可以看到三次重试尝试,两次尝试之间的等待时间为 1 秒。正如我们配置的一样。

9、总结

在本文中,我们学习了如何在 Sprint Boot 应用程序中使用 Resilience4j 库。此外,我们还重点介绍了几种容错模式,如熔断、限速、超时、隔板和重试。


参考:https://www.baeldung.com/spring-boot-resilience4j