在 Spring Boot 应用中使用 Resilience4j
1、概览
Resilience4j 是一个轻量级的容错库,提供了诸如熔断、重试、限流、超时、隔板等功能,可以帮助应用程序在面对故障和不稳定条件时保持可用性和可靠性。
在本教程中,我们将学习如何在 Spring Boot 应用程序中使用 Resilience4j。
2、项目设置
2.1、Maven 依赖
首先,我们需要添加 spring-boot-starter-web starter 来创建一个简单的 web 应用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
接下来,我们需要 resilience4j-spring-boot2
和 spring-boot-starter-aop
依赖,以便在 Spring Boot 中通过注解使用 Resilience-4j 库的功能:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
此外,我们还需要添加 spring-boot-starter-actuator
依赖,以便通过暴露一组端点来监控应用的当前状态:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
最后,还要添加 wiremock-jre8
依赖,用于帮助我们使用 mock HTTP server 测试 REST API:
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>
2.2、RestController 和外部 API 调用
在使用 Resilience4j
库的不同功能时,我们的 web 应用需要与外部 API 进行交互。因此,我们需要添加一个 RestTemplate
bean,用于 API 调用:
@Bean
public RestTemplate restTemplate() {
return new RestTemplateBuilder().rootUri("http://localhost:9090")
.build();
}
然后,我们定义 ExternalAPICaller
组件,并注入 restTemplate
Bean:
@Component
public class ExternalAPICaller {
private final RestTemplate restTemplate;
@Autowired
public ExternalAPICaller(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
}
接下来,我们将定义 ResilientAppController
类,该类将暴露 REST API 端点,并在内部使用 ExternalAPICaller
Bean 调用外部 API:
@RestController
@RequestMapping("/api/")
public class ResilientAppController {
private final ExternalAPICaller externalAPICaller;
}
2.3、Actuator 端点
我们可以通过 Spring Boot Actuator 暴露 health
端点,以便随时了解应用的确切状态。
在 application.properties
文件中添加配置,并启用端点:
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
management.health.circuitbreakers.enabled=true
management.health.ratelimiters.enabled=true
我们还会在此
application.properties
文件中添加额外的配置。
2.4、单元测试
在实际场景中,我们的 Web 应用将调用一个外部服务。然而,我们可以通过使用 WireMockExtension
类启动一个模拟的正在运行的外部服务。
将 EXTERNAL_SERVICE
定义为 ResilientAppControllerUnitTest
类中的静态成员:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ResilientAppControllerUnitTest {
@RegisterExtension
static WireMockExtension EXTERNAL_SERVICE = WireMockExtension.newInstance()
.options(WireMockConfiguration.wireMockConfig()
.port(9090))
.build();
}
然后,添加一个 TestRestTemplate
实例来调用 API:
@Autowired
private TestRestTemplate restTemplate;
2.5、异常处理
Resilience4j 库会根据 context 中的容错模式抛出异常,从而保护服务资源。不过,这些异常应转化为对客户端有意义 HTTP 响应码。
定义 ApiExceptionHandler
类来处理不同的异常:
@ControllerAdvice
public class ApiExceptionHandler {
}
在探索不同容错模式的过程中,我们将在该类中添加对应的 exception handler。
3、熔断 - Circuit breaker
熔断器(Circuit Breaker,也有叫“断路器”的)模式通过限制上游服务在部分或完全停机期间调用下游服务,从而保护下游服务。
首先,让我们暴露 /api/circuit-breaker
端点并添加 @CircuitBreaker
注解:
@GetMapping("/circuit-breaker")
@CircuitBreaker(name = "CircuitBreakerService")
public String circuitBreakerApi() {
return externalAPICaller.callApi();
}
我们还需要在 ExternalAPICaller
类中实现 callApi()
方法,用于调用外部端点 /api/external
:
public String callApi() {
return restTemplate.getForObject("/api/external", String.class);
}
接下来,我们将在 application.properties
文件中添加熔断器的配置:
resilience4j.circuitbreaker.instances.CircuitBreakerService.failure-rate-threshold=50
resilience4j.circuitbreaker.instances.CircuitBreakerService.minimum-number-of-calls=5
resilience4j.circuitbreaker.instances.CircuitBreakerService.automatic-transition-from-open-to-half-open-enabled=true
resilience4j.circuitbreaker.instances.CircuitBreakerService.wait-duration-in-open-state=5s
resilience4j.circuitbreaker.instances.CircuitBreakerService.permitted-number-of-calls-in-half-open-state=3
resilience4j.circuitbreaker.instances.CircuitBreakerService.sliding-window-size=10
resilience4j.circuitbreaker.instances.CircuitBreakerService.sliding-window-type=count_based
该配置将允许在熔断器关闭状态(closed)下对服务进行 50% 的失败调用,之后它将打开熔断器(open),并开始使用 CallNotPermittedException
拒绝请求。因此,最好在 ApiExceptionHandler
类中为该异常添加一个 exception handler:
@ExceptionHandler({CallNotPermittedException.class})
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
public void handleCallNotPermittedException() {
}
最后,我们将使用 EXTERNAL_SERVICE
模拟下游服务宕机的情况,从而测试 /api/circuit-breaker
API 端点:
@Test
public void testCircuitBreaker() {
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
.willReturn(serverError()));
IntStream.rangeClosed(1, 5)
.forEach(i -> {
ResponseEntity response = restTemplate.getForEntity("/api/circuit-breaker", String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR);
});
IntStream.rangeClosed(1, 5)
.forEach(i -> {
ResponseEntity response = restTemplate.getForEntity("/api/circuit-breaker", String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
});
EXTERNAL_SERVICE.verify(5, getRequestedFor(urlEqualTo("/api/external")));
}
我们可以看到,前五次调用都失败了,因为下游服务停机了。之后,熔断器切换到打开(open)状态,随后的五次尝试都以 503
HTTP 状态代码被拒绝,但并未实际调用底层 API。
4、重试 - Retry
重试模式通过从短暂的问题中恢复,为系统提供了弹性。让我们首先创建 /api/retry
API 端点,添加@Retry
注解。
@GetMapping("/retry")
@Retry(name = "retryApi", fallbackMethod = "fallbackAfterRetry")
public String retryApi() {
return externalAPICaller.callApi();
}
我们可以选择在所有重试尝试都失败时提供一个 fallback 机制。这里,我们提供 fallbackAfterRetry
作为 fallback 方法:
public String fallbackAfterRetry(Exception ex) {
return "all retries have exhausted";
}
接下来,我们将更新 application.properties
文件,添加控制重试行为的配置:
resilience4j.retry.instances.retryApi.max-attempts=3
resilience4j.retry.instances.retryApi.wait-duration=1s
resilience4j.retry.metrics.legacy.enabled=true
resilience4j.retry.metrics.enabled=true
如上所示,我们计划最多重试三次,每次延迟 1s
。
最后,我们来测试 /api/retry
API 端点的重试行为:
@Test
public void testRetry() {
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
.willReturn(ok()));
ResponseEntity<String> response1 = restTemplate.getForEntity("/api/retry", String.class);
EXTERNAL_SERVICE.verify(1, getRequestedFor(urlEqualTo("/api/external")));
EXTERNAL_SERVICE.resetRequests();
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
.willReturn(serverError()));
ResponseEntity<String> response2 = restTemplate.getForEntity("/api/retry", String.class);
Assert.assertEquals(response2.getBody(), "all retries have exhausted");
EXTERNAL_SERVICE.verify(3, getRequestedFor(urlEqualTo("/api/external")));
}
我们可以看到,在第一种情况下,没有出现任何问题,因此一次尝试就行。然而,当出现问题时,需要进行三次尝试,之后 API 会通过 fallback 机制做出响应。
5、超时 - Time Limiter
我们可以使用超时模式为对外部系统的异步调用设置超时阈值。
添加 /api/time-limiter
API 端点,添加 @TimeLimiter
注解。在内部调用“延迟” API:
@GetMapping("/time-limiter")
@TimeLimiter(name = "timeLimiterApi")
public CompletableFuture<String> timeLimiterApi() {
return CompletableFuture.supplyAsync(externalAPICaller::callApiWithDelay);
}
然后,我们将通过在 callApiWithDelay()
方法中添加 sleep 时间来模拟外部 API 调用的延迟:
public String callApiWithDelay() {
String result = restTemplate.getForObject("/api/external", String.class);
try {
Thread.sleep(5000);
} catch (InterruptedException ignore) {
}
return result;
}
接下来,我们需要在 application.properties
文件中提供 timeLimiterApi
的配置:
resilience4j.timelimiter.metrics.enabled=true
resilience4j.timelimiter.instances.timeLimiterApi.timeout-duration=2s
resilience4j.timelimiter.instances.timeLimiterApi.cancel-running-future=true
我们可以看到,阈值被设置为 2s
。之后,Resilience4j 库内部会以超时异常(TimeoutException
)取消异步操作。因此,我们要在 ApiExceptionHandler
类中为该异常添加一个 exception handler,以返回 408 HTTP 状态代码的 API 响应:
@ExceptionHandler({TimeoutException.class})
@ResponseStatus(HttpStatus.REQUEST_TIMEOUT)
public void handleTimeoutException() {
}
最后,我们测试 /api/time-limiter
API 端点配置的超时模式:
@Test
public void testTimeLimiter() {
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external").willReturn(ok()));
ResponseEntity<String> response = restTemplate.getForEntity("/api/time-limiter", String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.REQUEST_TIMEOUT);
EXTERNAL_SERVICE.verify(1, getRequestedFor(urlEqualTo("/api/external")));
}
不出所料,由于下游 API 调用被设置为需要 5 秒以上才能完成,我们看到 API 调用出现了超时。
6、隔板 - Bulkhead
隔板(bulkhead)模式限制了外部服务的最大并发调用次数。
让我们先添加带有 @Bulkhead
注解的 /api/bulkhead
API 端点:
@GetMapping("/bulkhead")
@Bulkhead(name="bulkheadApi")
public String bulkheadApi() {
return externalAPICaller.callApi();
}
接下来,我们要在 application.properties
文件中定义控制隔板功能的配置:
resilience4j.bulkhead.metrics.enabled=true
resilience4j.bulkhead.instances.bulkheadApi.max-concurrent-calls=3
resilience4j.bulkhead.instances.bulkheadApi.max-wait-duration=1
如上,我们希望将并发调用的最大次数限制为 3 次,这样,如果隔板已满,每个线程只能等待 1 毫秒。之后,请求将被 BulkheadFullException
异常拒绝。我们还要向客户端返回一个有意义的 HTTP 状态代码,因此我们添加一个 exception handler:
@ExceptionHandler({ BulkheadFullException.class })
@ResponseStatus(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)
public void handleBulkheadFullException() {
}
最后,我们将通过并行调用五个请求来测试隔板行为:
@Test
void testBulkheadEvents() throws Exception {
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
.willReturn(ok()));
Map<Integer, Integer> responseStatusCount = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(5);
IntStream.rangeClosed(1, 5)
.forEach(i -> executorService.execute(() -> {
ResponseEntity response = restTemplate.getForEntity("/api/bulkhead", String.class);
int statusCode = response.getStatusCodeValue();
responseStatusCount.merge(statusCode, 1, Integer::sum);
latch.countDown();
}));
latch.await();
executorService.shutdown();
assertEquals(2, responseStatusCount.keySet().size());
LOGGER.info("Response statuses: " + responseStatusCount.keySet());
assertTrue(responseStatusCount.containsKey(BANDWIDTH_LIMIT_EXCEEDED.value()));
assertTrue(responseStatusCount.containsKey(OK.value()));
EXTERNAL_SERVICE.verify(3, getRequestedFor(urlEqualTo("/api/external")));
}
我们可以看到,只有三个请求成功,而其他请求则被拒绝,并显示了 BANDWIDTH_LIMIT_EXCEEDED
HTTP 状态代码。
7、限速 - Rate Limiter
限速器模式限制对资源的请求速率。
首先,让我们添加带有 @RateLimiter
注解的 /api/rate-limiter
API 端点:
@GetMapping("/rate-limiter")
@RateLimiter(name = "rateLimiterApi")
public String rateLimitApi() {
return externalAPICaller.callApi();
}
接下来,在 application.properties
文件中定义限速器的配置:
resilience4j.ratelimiter.metrics.enabled=true
resilience4j.ratelimiter.instances.rateLimiterApi.register-health-indicator=true
resilience4j.ratelimiter.instances.rateLimiterApi.limit-for-period=5
resilience4j.ratelimiter.instances.rateLimiterApi.limit-refresh-period=60s
resilience4j.ratelimiter.instances.rateLimiterApi.timeout-duration=0s
resilience4j.ratelimiter.instances.rateLimiterApi.allow-health-indicator-to-fail=true
resilience4j.ratelimiter.instances.rateLimiterApi.subscribe-for-events=true
resilience4j.ratelimiter.instances.rateLimiterApi.event-consumer-buffer-size=50
通过此配置,我们希望将 API 调用速率限制为 5 个请求/分钟,且无需等待。达到允许速率的阈值后,请求将被 RequestNotPermitted
异常拒绝。因此,我们要在 ApiExceptionHandler
类中定义一个 exception handler,用于将其转换为有意义的 HTTP 状态响应代码:
@ExceptionHandler({ RequestNotPermitted.class })
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
public void handleRequestNotPermitted() {
}
最后,我们将使用 50 个请求测试我们的速率受限 API 端点:
@Test
public void testRatelimiter() {
EXTERNAL_SERVICE.stubFor(WireMock.get("/api/external")
.willReturn(ok()));
Map<Integer, Integer> responseStatusCount = new ConcurrentHashMap<>();
IntStream.rangeClosed(1, 50)
.parallel()
.forEach(i -> {
ResponseEntity<String> response = restTemplate.getForEntity("/api/rate-limiter", String.class);
int statusCode = response.getStatusCodeValue();
responseStatusCount.put(statusCode, responseStatusCount.getOrDefault(statusCode, 0) + 1);
});
assertEquals(2, responseStatusCount.keySet().size());
assertTrue(responseStatusCount.containsKey(TOO_MANY_REQUESTS.value()));
assertTrue(responseStatusCount.containsKey(OK.value()));
EXTERNAL_SERVICE.verify(5, getRequestedFor(urlEqualTo("/api/external")));
}
不出所料,只有五个请求成功,而其他所有请求都以 TOO_MANY_REQUESTS
HTTP 状态码失败。
8、Actuator 端点
我们配置了 actuator 端点,用于监控应用。通过这些端点,我们可以确定应用程序在各种容错模式下随着时间推移的变化情况。
首先,可以通过 GET
请求 /actuator
端点,获取所有暴露的端点:
http://localhost:8080/actuator/
{
"_links" : {
"self" : {...},
"bulkheads" : {...},
"circuitbreakers" : {...},
"ratelimiters" : {...},
...
}
}
我们可以看到一个 JSON 响应,其中包含 bulkheads
、circuitbreakers
、ratelimiters
等字段。提供了与容错模式相关的信息。
接下来,我们来看看与重试(retry)相关的字段:
"retries": {
"href": "http://localhost:8080/actuator/retries",
"templated": false
},
"retryevents": {
"href": "http://localhost:8080/actuator/retryevents",
"templated": false
},
"retryevents-name": {
"href": "http://localhost:8080/actuator/retryevents/{name}",
"templated": true
},
"retryevents-name-eventType": {
"href": "http://localhost:8080/actuator/retryevents/{name}/{eventType}",
"templated": true
}
接下来,我们可以检查应用,查看配置了“重试”的实例列表:
http://localhost:8080/actuator/retries
{
"retries" : [ "retryApi" ]
}
如上,我们可以在 retries
列表中看到配置的 retryApi
实例。
最后,我们将通过浏览器向 /api/retry
API 端点发起 GET
请求,并通过 /actuator/retryevents
端点观察重试事件:
{
"retryEvents": [
{
"retryName": "retryApi",
"type": "RETRY",
"creationTime": "2022-10-16T10:46:31.950822+05:30[Asia/Kolkata]",
"errorMessage": "...",
"numberOfAttempts": 1
},
{
"retryName": "retryApi",
"type": "RETRY",
"creationTime": "2022-10-16T10:46:32.965661+05:30[Asia/Kolkata]",
"errorMessage": "...",
"numberOfAttempts": 2
},
{
"retryName": "retryApi",
"type": "ERROR",
"creationTime": "2022-10-16T10:46:33.978801+05:30[Asia/Kolkata]",
"errorMessage": "...",
"numberOfAttempts": 3
}
]
}
由于下游服务宕机,我们可以看到三次重试尝试,两次尝试之间的等待时间为 1 秒。正如我们配置的一样。
9、总结
在本文中,我们学习了如何在 Sprint Boot 应用程序中使用 Resilience4j
库。此外,我们还重点介绍了几种容错模式,如熔断、限速、超时、隔板和重试。
参考:https://www.baeldung.com/spring-boot-resilience4j