限制 Webclient 的并发请求数量

1、简介

本文介绍了一些限制 WebClient 并发请求数量的方式。

2、服务端

限制 WebClient 并发请求数量是为了避免服务器因大量并发请求而宕机。有些服务自身也提供了一些限制策略。

2.1、一个简单的 Controller

为了演示,先定义一个简单的 @RestController,它返回固定范围的随机数字:

@RestController
@RequestMapping("/random")
public class RandomController {

    @GetMapping
    Integer getRandom() {
        return new Random().nextInt(50);
    }
}

接下来,我们将模拟一些耗时的操作,并限制并发请求的数量。

2.2、服务器限制并发请求数

修改服务,模拟一个更真实的场景。

首先,限制服务器可接受的并发请求数,并在达到限制时抛出异常。

其次,增加处理响应的延迟,模拟耗时的操作。

创建 Concurrency 用于限制并发数量:

public class Concurrency {

    public static final int MAX_CONCURRENT = 5;
    static final AtomicInteger CONCURRENT_REQUESTS = new AtomicInteger();

    public static int protect(IntSupplier supplier) {
        try {
            if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
                throw new UnsupportedOperationException("max concurrent requests reached");
            }

            TimeUnit.SECONDS.sleep(2);
            return supplier.getAsInt();
        } finally {
            CONCURRENT_REQUESTS.decrementAndGet();
        }
    }
}

更改端点以使用它:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

现在,当请求超过 MAX_CONCURRENT 时,端点会拒绝处理请求,并向客户端返回错误信息。

2.3、一个简单的客户端

以下的所有示例都将遵循这种模式,生成包含 n 个请求的 Flux,并向服务发出 GET 请求:

Flux.range(1, n)
  .flatMap(i -> {
    // GET 请求
  });

为了减少模板代码,用一个可以在所有示例中重复使用的方法来实现请求部分。

接收一个 WebClient,调用 get(),然后使用 ParameterizedTypeReference 泛型检索响应体:

public interface RandomConsumer {

    static <T> Mono<T> get(WebClient client) {
        return client.get()
          .retrieve()
          .bodyToMono(new ParameterizedTypeReference<T>() {});
    }
}

3、zipWith(Flux.interval())

第一个示例,使用 zipWith() 方法来在来固定延迟后发起请求。

public class ZipWithInterval {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .zipWith(Flux.interval(Duration.ofMillis(delay)))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

每次请求之前都会延迟 delay 毫秒。

4、Flux.delayElements()

Flux 有一种更直接的延迟消费方式:

public class DelayElements {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .delayElements(Duration.ofMillis(delay))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

使用 delayElements(),延迟会直接应用于 Subscriber.onNext() 信号。换句话说,它会延迟 Flux.range() 中的每个元素。因此,传入 flatMap() 的函数将受到影响,需要更长时间才能启动。例如,如果 delay 值为 1000,我们的请求开始前将延迟一秒钟。

4.1、调整解决方案

如果我们没有提供足够长的延迟时间,就会出现错误:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;

    int requests = 10;
    assertThrows(InternalServerError.class, () -> {
      DelayElements.fetch(client, requests, delay)
        .blockLast();
    });
}

这是因为我们每个请求需要等待 100 毫秒,但每个请求在服务器端需要两秒钟才能完成。因此,很快就达到了并发请求限制,并收到 500 错误。

如果增加足够的延迟,就可以避免请求受到限制。但这样一来,等待的时间就会过长,可能会严重影响性能。

既然我们知道服务器的并发限制数,接下来让我们看看有什么更合适的方法来处理这个问题。

5、使用 flatMap() 进行并发控制

考虑到服务的并发限制,最佳选择是最多并行发送 Concurrency.MAX_CONCURRENT 个请求。为此,可以在 flatMap() 中添加一个额外的参数来指定最大并行处理的数量:

public class LimitConcurrency {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency) {
        return Flux.range(1, requests)
          .flatMap(i -> RandomConsumer.get(client), concurrency);
    }
}

该参数可保证最大并发请求数不超过 concurrency,并保证我们的处理不会出现不必要的延迟:

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;

    int requests = 10;
    assertDoesNotThrow(() -> {
      LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
        .blockLast();
    });
}

6、使用 Resilience4j RateLimiter

Resilience4j 是一个多功能库,用于处理应用中的容错问题。可以使用它来限制一定时间间隔内的并发请求数,包括超时。

首先,添加 resilience4j-reactorresilience4j-ratelimiter 依赖:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

然后,使用 RateLimiter.of() 创建 Rate Limiter,提供名称、发送新请求的时间间隔、并发限制和超时:

public class Resilience4jRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency, int interval) {
        RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
          .limitRefreshPeriod(Duration.ofMillis(interval))
          .limitForPeriod(concurrency)
          .timeoutDuration(Duration.ofMillis(interval * concurrency))
          .build());

        // ...
    }
}

现在,通过 transformDeferred() 将其纳入 Flux,这样它就能控制 GET 请求速率:

return Flux.range(1, requests)
  .flatMap(i -> RandomConsumer.get(client)
    .transformDeferred(RateLimiterOperator.of(limiter))
  );

你会注意到,如果定义的时间间隔太短,仍然会出现问题。

这种方式适用于我们需要与其他操作共享 Rate Limiter 的场景。

7、用 Guava 进行精确地限制

Guava 有一个通用的 Rate Limiter,可以很好地满足我们的需求。由于它使用令牌桶算法,因此只会在必要时阻塞,而不会像 Flux.delayElements() 那样每次都阻塞。

首先,需要在 pom.xml 中添加 guava

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

调用 RateLimiter.create(),并将每秒要发送的最大请求数传给它。然后,在发送请求前调用 Limiter 的 acquire() 方法,对请求执行进行限制:

public class GuavaRateLimit {

    public static Flux<Integer> fetch(
      WebClient client, int requests, int requestsPerSecond) {
        RateLimiter limiter = RateLimiter.create(requestsPerSecond);

        return Flux.range(1, requests)
          .flatMap(i -> {
            limiter.acquire();

            return RandomConsumer.get(client);
          });
    }
}

这种解决方案简单且效果极佳,它不会让代码阻塞得比必要的时间更长。

8、总结

在本文中,我们介绍了几种可用来限制 WebClient 并发请求数的方法。


参考:https://www.baeldung.com/spring-webclient-limit-requests-per-second