理解 Spring Reactive 中的 switchIfEmpty()

1、概览

本文将带你了解 Spring Reactive 中的 switchIfEmpty() 操作符及其在使用和不使用 defer() 操作符时的行为,了解这些操作符在不同场景中的交互方式,并通过实际示例来说明它们对响应式流(Reactive Stream)的影响。

2、switchIfEmpty() 和 Defer() 的使用

switchIfEmpty()MonoFlux 中的一个操作符,用于在源生产者为空时执行备用生产者流。如果主源 Publisher 没有发布数据,该操作符就会切换到替代源的数据发布。

考虑一个从大型文件中通过 ID 检索用户详细信息的场景。当请求文件中的用户详细信息时,遍历文件会消耗大量时间。因此,对于经常访问的 ID,将其详细信息缓存起来更有意义。

当端点收到请求时,首先搜索缓存。如果用户详细信息可用,返回响应。如果没有,则从文件中获取数据并缓存起来,以备后续请求。

在这种情况下,主数据提供者(Primary Data Provider)是检查缓存中 KEY 是否存在的流,而备用数据提供者是检查文件中的 KEY 并更新缓存的流。switchIfEmpty() 操作符可以根据缓存中数据的可用性高效地切换数据源提供者。

了解 defer() 操作符的使用也很重要,它可以推迟函数的求值,直到发生订阅。如果我们不在 switchIfEmpty() 中使用 defer() 操作符,表达式就会立即(急切地)求值,从而可能导致意想不到的副作用。

3、示例设置

通过示例来了解 switchIfEmpty() 操作符在不同情况下的行为。

3.1、Data Model

首先,定义一个 User 模型类,其中包含一些详细信息,如 idemailusernameroles

public class User {

    @JsonProperty("id")
    private String id;

    @JsonProperty("email")
    private String email;

    @JsonProperty("username")
    private String username;

    @JsonProperty("roles")
    private String roles;

    // Getter / Setter 省略
}

3.2、用户数据设置

随后,在 classpath 中维护一个文件(users.json),其中包含 JSON 格式的所有用户详细信息:

[  
  {
    "id": "66b296723881ea345705baf1",
    "email": "may_barber@twiggery.cg",
    "username": "reid90",
    "roles": "member"
  },
  {
    "id": "66b29672e6f99a7156cc4ada",
    "email": "gwen_dodson@beadzza.bmw",
    "username": "boyle94",
    "roles": "admin"
  },
...
]

3.3、实现 Controller 和 Service

接着,添加一个 Controller,按 ID 检索用户详细信息。它接受一个可选的布尔参数 withDefer,并根据该查询参数使用不同的实现:

@GetMapping("/user/{id}")
public Mono<ResponseEntity<User>> findUserDetails(@PathVariable("id") String id, 
  @RequestParam("withDefer") boolean withDefer) {
    return (withDefer ? userService.findByUserIdWithDefer(id) : 
      userService.findByUserIdWithoutDefer(id)).map(ResponseEntity::ok);
}

然后,在 UserService 中定义这两种实现,包括使用和不使用 defer(),以了解 switchIfEmpty() 的行为:

public Mono<User> findByUserIdWithDefer(String id) {
    return fetchFromCache(id).switchIfEmpty(Mono.defer(() -> fetchFromFile(id)));
}
public Mono<User> findByUserIdWithoutDefer(String id) {
    return fetchFromCache(id).switchIfEmpty(fetchFromFile(id));
}

为简单起见,实现一个内存缓存来保留用户信息,作为请求的主要数据提供者。还记录了每次访问的日志,以确定是否命中缓存:

private final Map<String, User> usersCache;

private Mono<User> fetchFromCache(String id) {
    User user = usersCache.get(id);
    if (user != null) {
        LOG.info("Fetched user {} from cache", id);
        return Mono.just(user);
    }
    return Mono.empty();
}

然后,当 ID 不在缓存中时,我们要从文件中获取用户详细信息,并在找到数据时更新缓存:

private Mono<User> fetchFromFile(String id) {
    try {
        File file = new ClassPathResource("users.json").getFile();
        String usersData = new String(Files.readAllBytes(file.toPath()));
        List<User> users = objectMapper.readValue(usersData, new TypeReference<List<User>>() {
        });
        User user = users.stream()
          .filter(u -> u.getId()
            .equalsIgnoreCase(id))
          .findFirst()
          .get();
        usersCache.put(user.getId(), user);
        LOG.info("Fetched user {} from file", id);
        return Mono.just(user);
    } catch (IOException e) {
        return Mono.error(e);
    }
}

注意日志的细节,以确定是否从文件中获取了用户数据。

4、测试

@BeforeEach 测试方法中添加一个 ListAppender 来跟踪日志。用于确定是缓存还是文件执行了不同请求的函数:

protected ListAppender<ILoggingEvent> listAppender;

@BeforeEach
void setLogger() {
    Logger logger = (Logger) LoggerFactory.getLogger(UserService.class);
    logger.setLevel(Level.DEBUG);
    listAppender = new ListAppender<>();
    logger.addAppender(listAppender);
    listAppender.start();
}

4.1、switchIfEmpty() 在非空数据源上使用 defer()

只有当请求中的 withDefer 参数设置为 true 时,才会从缓存中检索用户数据:

@Test
void givenUserDataIsAvailableInCache_whenUserByIdIsRequestedWithDeferParameter_thenCachedResponseShouldBeRetrieved() {
    usersCache = new HashMap<>();
    User cachedUser = new User("66b29672e6f99a7156cc4ada", "gwen_dodson@beadzza.bmw", "boyle94", "admin");
    usersCache.put("66b29672e6f99a7156cc4ada", cachedUser);
    userService.getUsers()
      .putAll(usersCache);

    webTestClient.get()
      .uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=true")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," +
        "\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");

    assertTrue(listAppender.list.stream()
      .anyMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));

    assertTrue(listAppender.list.stream()
      .noneMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from file")));
}

switchIfEmpty()defer() 操作符一起使用时,替代源 Provider 不会被急于评估。

4.2、switchIfEmpty() 在非空数据源上不使用 defer()

添加另一个测试,检查在不使用 defer() 操作符的情况下使用 switchIfEmpty() 的行为:

@Test
void givenUserDataIsAvailableInCache_whenUserByIdIsRequestedWithoutDeferParameter_thenUserIsFetchedFromFileInAdditionToCache() {
    usersCache = new HashMap<>();
    User cachedUser1 = new User("66b29672e6f99a7156cc4ada", "gwen_dodson@beadzza.bmw", "boyle94", "admin");
    usersCache.put("66b29672e6f99a7156cc4ada", cachedUser1);
    userService.getUsers().putAll(usersCache);

    webTestClient.get()
      .uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=false")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," +
        "\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");

    assertTrue(listAppender.list.stream()
      .anyMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from file")));

    assertTrue(listAppender.list.stream()
      .anyMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}

如你所见,该实现从缓存和文件中获取了用户详细信息,但最终从缓存中提供了响应。尽管主要源(缓存)发出了数据,备用源中的代码块仍被不必要地触发。

4.3、switchIfEmpty() 在空源上使用 defer()

接下来,添加一个测试,以验证当缓存中没有数据时,特别是使用 defer() 操作符时,是否能从文件中检索到用户详细信息:

@Test
void givenUserDataIsNotAvailableInCache_whenUserByIdIsRequestedWithDeferParameter_thenFileResponseShouldBeRetrieved() {
    webTestClient.get()
      .uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=true")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"
        ,\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");

    assertTrue(listAppender.list.stream()
      .anyMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from file")));

    assertTrue(listAppender.list.stream()
      .noneMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}

API 会从文件中获取用户详细信息,而不是尝试从缓存中获取。

4.4、switchIfEmpty() 在空源上不使用 defer()

最后,添加一个测试,以验证在不使用 defer() 的情况下,实现是否能从文件中获取用户详细信息(即缓存中没有数据):

@Test
void givenUserDataIsNotAvailableInCache_whenUserByIdIsRequestedWithoutDeferParameter_thenFileResponseShouldBeRetrieved() {
    webTestClient.get()
      .uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=false")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," + "\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");

    assertTrue(listAppender.list.stream()
      .anyMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from file")));

    assertTrue(listAppender.list.stream()
      .noneMatch(e -> e.toString()
        .contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}

由于缓存中没有数据,用户详细信息不会从缓存中获取(副作用就是仍会尝试获取),API 仍会按预期从文件中获取用户详细信息。

5、总结

本文通过测试重点了解了 Spring Reactive 中的 switchIfEmpty() 操作符及其各种行为。

switchIfEmpty()defer() 结合使用,可确保只在必要时访问替代数据源。这样可以避免不必要的计算和潜在的副作用。


Ref:https://www.baeldung.com/spring-reactive-switchifempty