在 Spring 中实现异步重试机制

1、概览

有时,我们会通过异步的方式来提高应用的性能和响应能力。但是也需要考虑到偶尔故障的情况,如网络问题。此时,我们可以通过重试机制来重新调用。

本文将带你了解 Spring 对异步(async)和重试(retry)操作的支持以及如何在 Spring 应用中实现带有自动重试功能的异步执行。

2、Spring Boot 示例应用

构建一个简单的微服务,调用下游服务来处理一些数据。

2.1、Maven 依赖

添加 spring-boot-starter-web 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2、实现 Spring Service

实现 EventService 类,在 processEvents 方法中调用另一个服务的方法:

public String processEvents(List<String> events) {
    downstreamService.publishEvents(events);
    return "Completed";
}

定义 DownstreamService 接口:

public interface DownstreamService {
    boolean publishEvents(List<String> events);
}

3、实现带重试功能的异步执行

使用 spring-retry 来实现带有重试功能的异步执行。

3.1、添加 Retry 依赖

pom.xm 中添加 spring-retry 依赖:

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>2.0.4</version>
</dependency>

3.2、@EnableAsync@EnableRetry 配置

添加 @EnableAsync@EnableRetry 注解:

@Configuration
@ComponentScan("com.baeldung.asyncwithretry")
@EnableRetry
@EnableAsync
public class AsyncConfig {
}

3.3、包含 @Async@Retryable 注解

使用 @Async 注解来异步执行一个方法。同样,使用 @Retryable 来注解方法,以启用重试执行。

在上述 EventService 方法中配置上述注解:

@Async
@Retryable(retryFor = RuntimeException.class, maxAttempts = 4, backoff = @Backoff(delay = 100))
public Future<String> processEvents(List<String> events) {
    LOGGER.info("Processing asynchronously with Thread {}", Thread.currentThread().getName());
    downstreamService.publishEvents(events);
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("Completed");
    LOGGER.info("Completed async method with Thread {}", Thread.currentThread().getName());
    return future;
}

在上述代码中,如果出现 RuntimeException,就会重试该方法,并将结果作为 Future 对象返回。

注意,需要使用 Future 来封装任何异步方法的响应。

@Async 注解仅适用于 public 方法,不应在同一类中直接调用。自调用方法会绕过 Spring 代理调用,并在同一线程中运行(Spring 的代理机制)。

4、测试 @Async@Retryable

用几个测试用例来测试 EventService 方法并验证其异步和重试行为。

首先,测试在下游服务调用没有出错的情况:

@Test
void givenAsyncMethodHasNoRuntimeException_whenAsyncMethodIscalled_thenReturnSuccess_WithoutAnyRetry() throws Exception {
    LOGGER.info("Testing for async with retry execution with thread " + Thread.currentThread().getName()); 
    when(downstreamService.publishEvents(anyList())).thenReturn(true);
    Future<String> resultFuture = eventService.processEvents(List.of("test1"));
    while (!resultFuture.isDone() && !resultFuture.isCancelled()) {
        TimeUnit.MILLISECONDS.sleep(5);
    }
    assertTrue(resultFuture.isDone());
    assertEquals("Completed", resultFuture.get());
    verify(downstreamService, times(1)).publishEvents(anyList());
}

如上,等待 Future 完成(Completion),然后断言结果。

运行上述测试,输出日志如下:

18:59:24.064 [main] INFO com.baeldung.asyncwithretry.EventServiceIntegrationTest - Testing for async with retry execution with thread main
18:59:24.078 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
18:59:24.080 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Completed async method with Thread SimpleAsyncTaskExecutor-1

从上述日志中,可以确认服务方法是在单独的线程中运行的。

接下来,测试在 DownstreamService 方法抛出 RuntimeException 的情况:

@Test
void givenAsyncMethodHasRuntimeException_whenAsyncMethodIsCalled_thenReturnFailure_With_MultipleRetries() throws InterruptedException {
    LOGGER.info("Testing for async with retry execution with thread " + Thread.currentThread().getName()); 
    when(downstreamService.publishEvents(anyList())).thenThrow(RuntimeException.class);
    Future<String> resultFuture = eventService.processEvents(List.of("test1"));
    while (!resultFuture.isDone() && !resultFuture.isCancelled()) {
        TimeUnit.MILLISECONDS.sleep(5);
    }
    assertTrue(resultFuture.isDone());
    assertThrows(ExecutionException.class, resultFuture::get);
    verify(downstreamService, times(4)).publishEvents(anyList());
}

输出日志如下:

19:01:32.307 [main] INFO com.baeldung.asyncwithretry.EventServiceIntegrationTest - Testing for async with retry execution with thread main
19:01:32.318 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
19:01:32.425 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
.....

从上述日志中,可以确认服务方法被异步重新执行了四次。

5、总结

本文介绍了如何在 Spring 应用中通过 @Async@Retryable 注解实现带有重试机制的异步方法。


Ref:https://www.baeldung.com/spring-async-retry