Spring WebClient 中的 exchange() 和 retrieve() 方法

1、概览

WebClient 是一个简化 HTTP 请求执行过程的接口。与 RestTemplate 不同,它是一个响应式非阻塞客户端,可以消费和操作 HTTP 响应。虽然它被设计为非阻塞型,但也可用于阻塞型场景。

本文将带你了解 WebClient 接口中的关键方法,包括 retrieve()exchangeToMono()exchangeToFlux(),以及它们之间的差异。

2、示例项目设置

首先,创建一个 Spring Boot 应用,在 pom.xml 中添加 spring-boot-starter-webflux 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.2.4</version>
</dependency>

该依赖提供了 WebClient 接口,用于执行 HTTP 请求。

另外,来看看 https://jsonplaceholder.typicode.com/users/1 请求的 GET 响应示例:

{
  "id": 1,
  "name": "Leanne Graham",
// ...
}

创建一个名为 User 的 POJO 类:

class User {

    private int id;
    private String name;

   // 构造函数、Getter、Setter 方法省略

}

来自 JSONPlaceholder API 的 JSON 响应将被反序列化并映射到 User 类的实例。

最后,用 base URL 创建一个 WebClient 实例:

WebClient client = WebClient.create("https://jsonplaceholder.typicode.com/users");

如上,定义了 HTTP 请求的基 base URL。

3、exchange() 方法

exchange() 方法直接返回 ClientResponse,从而可以直接访问 HTTP 状态码、Header 和响应体。简单地说,ClientResponse 表示 WebClient 返回的 HTTP 响应。

不过,自 Spring 5.3 版起,该方法已被弃用,取而代之的是 exchangeToMono()exchangeToFlux() 方法,具体取决于我们发出的响应。这两个方法允许我们根据响应状态码对响应进行解码。

3.1、发送 Mono

来看一个使用 exchangeToMono() 发送 Mono 代码的示例:

@GetMapping("/user/exchange-mono/{id}")
Mono<User> retrieveUsersWithExchangeAndError(@PathVariable int id) {
    return client.get()
      .uri("/{id}", id)
      .exchangeToMono(res -> {
          if (res.statusCode().is2xxSuccessful()) {
              return res.bodyToMono(User.class);
          } else if (res.statusCode().is4xxClientError()) {
              return Mono.error(new RuntimeException("Client Error: can't fetch user"));
          } else if (res.statusCode().is5xxServerError()) {
              return Mono.error(new RuntimeException("Server Error: can't fetch user"));
          } else {
              return res.createError();
           }
     });
}

如上,根据 HTTP 状态码检索用户并解码响应。

3.2、发送 Flux

使用 exchangeToFlux() 来获取用户集合:

@GetMapping("/user-exchange-flux")
Flux<User> retrieveUsersWithExchange() {
   return client.get()
     .exchangeToFlux(res -> {
         if (res.statusCode().is2xxSuccessful()) {
             return res.bodyToFlux(User.class);
         } else {
             return Flux.error(new RuntimeException("Error while fetching users"));
         }
    });
}

如上,使用 exchangeToFlux() 方法将响应体映射到 Flux<User> 对象,并在请求失败时返回自定义错误信息。

3.3、直接检索响应体

值得注意的是,使用 exchangeToMono()exchangeToFlux() 时无需指定响应状态码:

@GetMapping("/user-exchange")
Flux<User> retrieveAllUserWithExchange(@PathVariable int id) {
    return client.get().exchangeToFlux(res -> res.bodyToFlux(User.class))
      .onErrorResume(Flux::error);
}

如上,检索用户时不指定状态码。

3.4、更改响应体

来看一个修改响应体的示例:

@GetMapping("/user/exchange-alter/{id}")
Mono<User> retrieveOneUserWithExchange(@PathVariable int id) {
    return client.get()
      .uri("/{id}", id)
      .exchangeToMono(res -> res.bodyToMono(User.class))
      .map(user -> {
          user.setName(user.getName().toUpperCase());
          user.setId(user.getId() + 100);
          return user;
      });
}

如上,将响应体映射到 POJO 类后,通过把 id + 100 并将名称大写来更改响应体。

注意,还可以使用 retrieve() 方法更改响应体。

3.5、获取响应头

此外,还可以提取响应头:

@GetMapping("/user/exchange-header/{id}")
Mono<User> retrieveUsersWithExchangeAndHeader(@PathVariable int id) {
  return client.get()
    .uri("/{id}", id)
    .exchangeToMono(res -> {
        if (res.statusCode().is2xxSuccessful()) {
            logger.info("Status code: " + res.headers().asHttpHeaders());
            logger.info("Content-type" + res.headers().contentType());
            return res.bodyToMono(User.class);
        } else if (res.statusCode().is4xxClientError()) {
            return Mono.error(new RuntimeException("Client Error: can't fetch user"));
        } else if (res.statusCode().is5xxServerError()) {
            return Mono.error(new RuntimeException("Server Error: can't fetch user"));
        } else {
            return res.createError();
        }
    });
}

如上,将 HTTP 头信息和内容类型(Content Type)记录到控制台。与需要返回 ResponseEntity 才能访问头和响应码的 retrieve() 方法不同,exchangeToMono() 可以直接访问,因为它返回 ClientResponse

4、retrieve() 方法

retrieve() 方法简化了从 HTTP 请求中提取响应体的过程。它返回 ResponseSpec,允许指定如何处理响应体,而无需访问完整的 ClientResponse

4.1、发送 Mono

检索 HTTP 响应体的示例代码如下:

@GetMapping("/user/{id}")
Mono<User> retrieveOneUser(@PathVariable int id) {
    return client.get()
      .uri("/{id}", id)
      .retrieve()
      .bodyToMono(User.class)
      .onErrorResume(Mono::error);
}

如上,通过 HTTP 调用 /users 端点的特定 id 从 base URL 获取 JSON。然后,将响应体映射到 User 对象。

4.2、发送 Flux

/users 端点发起 GET 请求的示例如下:

@GetMapping("/users")
Flux<User> retrieveAllUsers() {
    return client.get()
      .retrieve()
      .bodyToFlux(User.class)
      .onResumeError(Flux::error);
}

如上,该方法在将 HTTP 响应映射到 POJO 类时,会发送一个 Flux<User> 对象。

4.3、返回 ResponseEntity

如果打算使用 retrieve() 方法访问响应状态码和 Header,可以返回 ResponseEntity

@GetMapping("/user-id/{id}")
Mono<ResponseEntity<User>> retrieveOneUserWithResponseEntity(@PathVariable int id) {
    return client.get()
      .uri("/{id}", id)
      .accept(MediaType.APPLICATION_JSON)
      .retrieve()
      .toEntity(User.class)
      .onErrorResume(Mono::error);
}

使用 toEntity() 方法获得的响应包含 HTTP 头信息、状态码和响应体。

4.4、使用 onStatus() Handler 自定义 Error

此外,出现 400500 HTTP 错误时,默认情况下会返回 WebClientResponseException Error。不过,我们可以使用 onStatus() Handler 自定义异常,以给出自定义错误响应:

@GetMapping("/user-status/{id}")
Mono<User> retrieveOneUserAndHandleErrorBasedOnStatus(@PathVariable int id) {
    return client.get()
      .uri("/{id}", id)
      .retrieve()
      .onStatus(HttpStatusCode::is4xxClientError, 
        response -> Mono.error(new RuntimeException("Client Error: can't fetch user")))
      .onStatus(HttpStatusCode::is5xxServerError, 
        response -> Mono.error(new RuntimeException("Server Error: can't fetch user")))
      .bodyToMono(User.class);
}

如上,检查 HTTP 状态码,并使用 onStatus() Handler 定义自定义错误响应。

5、性能比较

接下来,使用 Java Microbench Harness (JMH) 编写一个性能测试,比较 retrieve()exchangeToFlux() 的执行时间。

首先,创建一个名为 RetrieveAndExchangeBenchmarkTest 的类:

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 3, time = 10, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.MICROSECONDS)
public class RetrieveAndExchangeBenchmarkTest {
  
    private WebClient client;

    @Setup
    public void setup() {
        this.client = WebClient.create("https://jsonplaceholder.typicode.com/users");
    }
}

如上,将基准模式设置为平均时间(AverageTime),即测量测试执行的平均时间。此外,还定义了迭代次数和每次迭代的运行时间。

接下来,创建一个 WebClient 实例,并使用 @Setup 注解使其在每次基准测试前运行。

编写一个基准方法,使用 retrieve() 方法检索用户集合:

@Benchmark
Flux<User> retrieveManyUserUsingRetrieveMethod() {
    return client.get()
      .retrieve()
      .bodyToFlux(User.class)
      .onErrorResume(Flux::error);;
}

最后,使用 exchangeToFlux() 方法定义一个发送 Flux<User> 对象方法:

@Benchmark
Flux<User> retrieveManyUserUsingExchangeToFlux() {
    return client.get()
      .exchangeToFlux(res -> res.bodyToFlux(User.class))
      .onErrorResume(Flux::error);
}

基准结果如下:

Benchmark                             Mode  Cnt   Score    Error  Units
retrieveManyUserUsingExchangeToFlux   avgt   15  ≈ 10⁻⁴            s/op
retrieveManyUserUsingRetrieveMethod   avgt   15  ≈ 10⁻³            s/op

两种方法的性能都很高效,不过,在检索用户集合时,exchangeToFlux()retrieve() 方法稍快。

6、主要差异和相似之处

retrieve()exchangeToMono()exchangeToFlux() 都可用于发出 HTTP 请求并提取 HTTP 响应。

retrieve() 方法只允许我们消费 HTTP Body 并发出 MonoFlux,因为它会返回 ResponseSpec。但是,如果我们想访问状态码和 Header,可以使用带有 ResponseEntityretrieve() 方法。此外,它还允许我们使用 onStatus() Handler 根据 HTTP 状态码报告错误。

retrieve() 方法不同,exchangeToMono()exchnageToFlux() 允许我们消费 HTTP 响应,并直接访问 Header 和响应状态码,因为它们会返回 ClientResponse。此外,它们还提供了更多的错误处理控制,因为我们可以根据 HTTP 状态码对响应进行解码。

如果只想使用响应体,建议使用 retrieve() 方法。如果需要对响应进行更多控制,那么 exchangeToMono()exchangeToFlux() 可能是更好的选择。

7、总结

本文介绍了如何使用 retrieve()exchangeToMono()exchangeToFlux() 方法处理 HTTP 响应,以及如何将响应映射到 POJO 类。最后还比较了 retrieve()exchangeToFlux() 方法的性能。

retrieve() 方法适用于只需要消费响应体,而不需要访问状态码或 Header 的场景。它通过返回 ResponseSpec 简化了过程,而 ResponseSpec 提供了一种直接处理响应体的方法。


Ref:https://www.baeldung.com/spsring-webclient-exchange-vs-retrieve