Spring WebFlux 中的重试

1、概览

在云上构建分布式应用时,需要考虑到服务故障,这通常会涉及到重试。

Spring WebFlux 提供了一些失败后重试的工具。

本文将会带你了解如何在 Spring WebFlux 添加和配置重试功能。

2、用例

本文使用一个 MockWebServer 来模拟外部系统暂时不可用,然后又变为可用的情况。

连接到 REST 服务的组件:

@Test
void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() {

    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setBody("stock data"));

    StepVerifier.create(externalConnector.getData("ABC"))
      .expectNextMatches(response -> response.equals("stock data"))
      .verifyComplete();

    verifyNumberOfGetRequests(4);
}

3、添加重试

MonoFlux API 中内置了两个关键的 retry 操作。

3.1、使用 retry

retry 可以防止应用立即返回错误,并重新订阅指定次数:

public Mono<String> getData(String stockId) {
    return webClient.get()
        .uri(PATH_BY_ID, stockId)
        .retrieve()
        .bodyToMono(String.class)
        .retry(3);
}

无论 Web 客户端返回什么错误,都会重试最多三次。

3.2、使用 retryWhen

retryWhen 方法可用来创建一个可配置的重试策略:

public Mono<String> getData(String stockId) {
    return webClient.get()
        .uri(PATH_BY_ID, stockId)
        .retrieve()
        .bodyToMono(String.class)
        .retryWhen(Retry.max(3));
}

如上,可以通过设置一个 Retry 对象来描述所需的逻辑。

这里使用了 max 策略,指定最多重试次数。这与第一个示例相同,但这里可以有更多的配置选项。特别要注意的是,在这种情况下,每次重试都尽快发生。

4、添加延迟

无延迟重试的主要缺点是无法给故障服务恢复时间。这可能会使服务不堪重负,使问题更加严重,降低恢复的机会。

4.1、fixedDelay 重试

可以使用 fixedDelay 策略在每次重试之间添加延迟:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}

该配置允许两次重试之间有两秒钟的延迟,这可能会增加成功的几率。不过,如果服务器正在经历较长时间的中断,那么应该等待更长的时间。但是,如果将所有延迟都配置为很长的时间,那么短时间的中断将使服务更加缓慢。

4.2、backoff 重试

可以使用 backoff 策略来代替固定间隔重试:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}

如上,逐渐增加了两次重试之间的延迟 - 在本例中,大致是 2 秒、4 秒和 8 秒的间隔。这样,外部系统就有更好的机会从常见的连接问题中恢复,或处理堆积的任务。

4.3、jitter 重试

backoff 策略的另一个好处是,它为计算出的延迟时间间隔增加了随机性或抖动。抖动可以帮助减少多个客户端同时重试导致的重试风暴。

默认情况下,该值设置为 0.5,相当于抖动最多为计算延迟的 50%

使用 jitter 方法配置一个不同的值 0.75,以表示最多为计算延迟的 75% 的抖动:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .accept(MediaType.APPLICATION_JSON)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}

注意,可能的取值范围介于 0(无抖动)和 1(抖动最多为计算延迟的 100%)之间。

5、错误过滤

此时,来自服务的任何错误都会导致重试,包括 400 Bad Request401 Unauthorized4xx 错误。

显然,不应该在客户端出现此类错误时重试,因为再怎么重试,服务器的响应不会有任何不同。

因此,需要仅在特定错误的情况下应用重试策略。

首先,创建一个异常来表示服务器错误:

public class ServiceException extends RuntimeException {
    
    public ServiceException(String message, int statusCode) {
        super(message);
        this.statusCode = statusCode;
    }
}

接下来,创建一个包含 5xx 异常的错误 Mono,并使用 filter 方法来配置过滤策略:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .onStatus(HttpStatus::is5xxServerError, 
          response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
          .filter(throwable -> throwable instanceof ServiceException));
}

现在,只有在 WebClient Pipeline(管道)抛出 ServiceException 时重试。

6、处理重试次数耗尽的情况

最后,需要考虑所有重试都不成功的情况。

在这种情况下,策略的默认行为是传播一个 RetryExhaustedException,封装了最后一个错误。

可以通过使用 onRetryExhaustedThrow 方法并提供一个 ServiceException 的Generator(生成器)来覆盖这种行为:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
          .filter(throwable -> throwable instanceof ServiceException)
          .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
              throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());
          }));
}

现在,请求会在一系列重试失败后出现 ServiceException 异常。

7、总结

本文先介绍了如何使用 Webflux 中的重试方法,如 retryretryWhen,以及如何为重试添加各种策略的延迟。最后介绍了如何对要重试的错误进行过滤以及处理重试次数耗尽的情况。


Ref:https://www.baeldung.com/spring-webflux-retry