限制 Webclient 的并发请求数量
1、简介
本文介绍了一些限制 WebClient 并发请求数量的方式。
2、服务端
限制 WebClient 并发请求数量是为了避免服务器因大量并发请求而宕机。有些服务自身也提供了一些限制策略。
2.1、一个简单的 Controller
为了演示,先定义一个简单的 @RestController
,它返回固定范围的随机数字:
@RestController
@RequestMapping("/random")
public class RandomController {
@GetMapping
Integer getRandom() {
return new Random().nextInt(50);
}
}
接下来,我们将模拟一些耗时的操作,并限制并发请求的数量。
2.2、服务器限制并发请求数
修改服务,模拟一个更真实的场景。
首先,限制服务器可接受的并发请求数,并在达到限制时抛出异常。
其次,增加处理响应的延迟,模拟耗时的操作。
创建 Concurrency
用于限制并发数量:
public class Concurrency {
public static final int MAX_CONCURRENT = 5;
static final AtomicInteger CONCURRENT_REQUESTS = new AtomicInteger();
public static int protect(IntSupplier supplier) {
try {
if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
throw new UnsupportedOperationException("max concurrent requests reached");
}
TimeUnit.SECONDS.sleep(2);
return supplier.getAsInt();
} finally {
CONCURRENT_REQUESTS.decrementAndGet();
}
}
}
更改端点以使用它:
@GetMapping
Integer getRandom() {
return Concurrency.protect(() -> new Random().nextInt(50));
}
现在,当请求超过 MAX_CONCURRENT
时,端点会拒绝处理请求,并向客户端返回错误信息。
2.3、一个简单的客户端
以下的所有示例都将遵循这种模式,生成包含 n
个请求的 Flux
,并向服务发出 GET 请求:
Flux.range(1, n)
.flatMap(i -> {
// GET 请求
});
为了减少模板代码,用一个可以在所有示例中重复使用的方法来实现请求部分。
接收一个 WebClient
,调用 get()
,然后使用 ParameterizedTypeReference
泛型检索响应体:
public interface RandomConsumer {
static <T> Mono<T> get(WebClient client) {
return client.get()
.retrieve()
.bodyToMono(new ParameterizedTypeReference<T>() {});
}
}
3、zipWith(Flux.interval())
第一个示例,使用 zipWith()
方法来在来固定延迟后发起请求。
public class ZipWithInterval {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.zipWith(Flux.interval(Duration.ofMillis(delay)))
.flatMap(i -> RandomConsumer.get(client));
}
}
每次请求之前都会延迟 delay
毫秒。
4、Flux.delayElements()
Flux
有一种更直接的延迟消费方式:
public class DelayElements {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.delayElements(Duration.ofMillis(delay))
.flatMap(i -> RandomConsumer.get(client));
}
}
使用 delayElements()
,延迟会直接应用于 Subscriber.onNext()
信号。换句话说,它会延迟 Flux.range()
中的每个元素。因此,传入 flatMap()
的函数将受到影响,需要更长时间才能启动。例如,如果 delay
值为 1000
,我们的请求开始前将延迟一秒钟。
4.1、调整解决方案
如果我们没有提供足够长的延迟时间,就会出现错误:
@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
int delay = 100;
int requests = 10;
assertThrows(InternalServerError.class, () -> {
DelayElements.fetch(client, requests, delay)
.blockLast();
});
}
这是因为我们每个请求需要等待 100 毫秒,但每个请求在服务器端需要两秒钟才能完成。因此,很快就达到了并发请求限制,并收到 500 错误。
如果增加足够的延迟,就可以避免请求受到限制。但这样一来,等待的时间就会过长,可能会严重影响性能。
既然我们知道服务器的并发限制数,接下来让我们看看有什么更合适的方法来处理这个问题。
5、使用 flatMap()
进行并发控制
考虑到服务的并发限制,最佳选择是最多并行发送 Concurrency.MAX_CONCURRENT
个请求。为此,可以在 flatMap()
中添加一个额外的参数来指定最大并行处理的数量:
public class LimitConcurrency {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency) {
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client), concurrency);
}
}
该参数可保证最大并发请求数不超过 concurrency
,并保证我们的处理不会出现不必要的延迟:
@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
int limit = Concurrency.MAX_CONCURRENT;
int requests = 10;
assertDoesNotThrow(() -> {
LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
.blockLast();
});
}
6、使用 Resilience4j RateLimiter
Resilience4j 是一个多功能库,用于处理应用中的容错问题。可以使用它来限制一定时间间隔内的并发请求数,包括超时。
首先,添加 resilience4j-reactor
和 resilience4j-ratelimiter
依赖:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.7.1</version>
</dependency>
然后,使用 RateLimiter.of()
创建 Rate Limiter,提供名称、发送新请求的时间间隔、并发限制和超时:
public class Resilience4jRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency, int interval) {
RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(interval))
.limitForPeriod(concurrency)
.timeoutDuration(Duration.ofMillis(interval * concurrency))
.build());
// ...
}
}
现在,通过 transformDeferred()
将其纳入 Flux
,这样它就能控制 GET 请求速率:
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client)
.transformDeferred(RateLimiterOperator.of(limiter))
);
你会注意到,如果定义的时间间隔太短,仍然会出现问题。
这种方式适用于我们需要与其他操作共享 Rate Limiter 的场景。
7、用 Guava 进行精确地限制
Guava 有一个通用的 Rate Limiter,可以很好地满足我们的需求。由于它使用令牌桶算法,因此只会在必要时阻塞,而不会像 Flux.delayElements()
那样每次都阻塞。
首先,需要在 pom.xml
中添加 guava:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
调用 RateLimiter.create()
,并将每秒要发送的最大请求数传给它。然后,在发送请求前调用 Limiter 的 acquire()
方法,对请求执行进行限制:
public class GuavaRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int requestsPerSecond) {
RateLimiter limiter = RateLimiter.create(requestsPerSecond);
return Flux.range(1, requests)
.flatMap(i -> {
limiter.acquire();
return RandomConsumer.get(client);
});
}
}
这种解决方案简单且效果极佳,它不会让代码阻塞得比必要的时间更长。
8、总结
在本文中,我们介绍了几种可用来限制 WebClient 并发请求数的方法。
参考:https://www.baeldung.com/spring-webclient-limit-requests-per-second