在 Spring 中实现异步重试机制
1、概览
有时,我们会通过异步的方式来提高应用的性能和响应能力。但是也需要考虑到偶尔故障的情况,如网络问题。此时,我们可以通过重试机制来重新调用。
本文将带你了解 Spring 对异步(async)和重试(retry)操作的支持以及如何在 Spring 应用中实现带有自动重试功能的异步执行。
2、Spring Boot 示例应用
构建一个简单的微服务,调用下游服务来处理一些数据。
2.1、Maven 依赖
添加 spring-boot-starter-web
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2、实现 Spring Service
实现 EventService
类,在 processEvents
方法中调用另一个服务的方法:
public String processEvents(List<String> events) {
downstreamService.publishEvents(events);
return "Completed";
}
定义 DownstreamService
接口:
public interface DownstreamService {
boolean publishEvents(List<String> events);
}
3、实现带重试功能的异步执行
使用 spring-retry
来实现带有重试功能的异步执行。
3.1、添加 Retry 依赖
在 pom.xm
中添加 spring-retry
依赖:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>2.0.4</version>
</dependency>
3.2、@EnableAsync
和 @EnableRetry
配置
添加 @EnableAsync
和 @EnableRetry
注解:
@Configuration
@ComponentScan("com.baeldung.asyncwithretry")
@EnableRetry
@EnableAsync
public class AsyncConfig {
}
3.3、包含 @Async
和 @Retryable
注解
使用 @Async
注解来异步执行一个方法。同样,使用 @Retryable
来注解方法,以启用重试执行。
在上述 EventService
方法中配置上述注解:
@Async
@Retryable(retryFor = RuntimeException.class, maxAttempts = 4, backoff = @Backoff(delay = 100))
public Future<String> processEvents(List<String> events) {
LOGGER.info("Processing asynchronously with Thread {}", Thread.currentThread().getName());
downstreamService.publishEvents(events);
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("Completed");
LOGGER.info("Completed async method with Thread {}", Thread.currentThread().getName());
return future;
}
在上述代码中,如果出现 RuntimeException
,就会重试该方法,并将结果作为 Future
对象返回。
注意,需要使用 Future
来封装任何异步方法的响应。
@Async
注解仅适用于 public
方法,不应在同一类中直接调用。自调用方法会绕过 Spring 代理调用,并在同一线程中运行(Spring 的代理机制)。
4、测试 @Async
和 @Retryable
用几个测试用例来测试 EventService
方法并验证其异步和重试行为。
首先,测试在下游服务调用没有出错的情况:
@Test
void givenAsyncMethodHasNoRuntimeException_whenAsyncMethodIscalled_thenReturnSuccess_WithoutAnyRetry() throws Exception {
LOGGER.info("Testing for async with retry execution with thread " + Thread.currentThread().getName());
when(downstreamService.publishEvents(anyList())).thenReturn(true);
Future<String> resultFuture = eventService.processEvents(List.of("test1"));
while (!resultFuture.isDone() && !resultFuture.isCancelled()) {
TimeUnit.MILLISECONDS.sleep(5);
}
assertTrue(resultFuture.isDone());
assertEquals("Completed", resultFuture.get());
verify(downstreamService, times(1)).publishEvents(anyList());
}
如上,等待 Future
完成(Completion),然后断言结果。
运行上述测试,输出日志如下:
18:59:24.064 [main] INFO com.baeldung.asyncwithretry.EventServiceIntegrationTest - Testing for async with retry execution with thread main
18:59:24.078 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
18:59:24.080 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Completed async method with Thread SimpleAsyncTaskExecutor-1
从上述日志中,可以确认服务方法是在单独的线程中运行的。
接下来,测试在 DownstreamService
方法抛出 RuntimeException
的情况:
@Test
void givenAsyncMethodHasRuntimeException_whenAsyncMethodIsCalled_thenReturnFailure_With_MultipleRetries() throws InterruptedException {
LOGGER.info("Testing for async with retry execution with thread " + Thread.currentThread().getName());
when(downstreamService.publishEvents(anyList())).thenThrow(RuntimeException.class);
Future<String> resultFuture = eventService.processEvents(List.of("test1"));
while (!resultFuture.isDone() && !resultFuture.isCancelled()) {
TimeUnit.MILLISECONDS.sleep(5);
}
assertTrue(resultFuture.isDone());
assertThrows(ExecutionException.class, resultFuture::get);
verify(downstreamService, times(4)).publishEvents(anyList());
}
输出日志如下:
19:01:32.307 [main] INFO com.baeldung.asyncwithretry.EventServiceIntegrationTest - Testing for async with retry execution with thread main
19:01:32.318 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
19:01:32.425 [SimpleAsyncTaskExecutor-1] INFO com.baeldung.asyncwithretry.EventService - Processing asynchronously with Thread SimpleAsyncTaskExecutor-1
.....
从上述日志中,可以确认服务方法被异步重新执行了四次。
5、总结
本文介绍了如何在 Spring 应用中通过 @Async
和 @Retryable
注解实现带有重试机制的异步方法。
Ref:https://www.baeldung.com/spring-async-retry