理解 Spring Reactive 中的 switchIfEmpty()
1、概览
本文将带你了解 Spring Reactive 中的 switchIfEmpty() 操作符及其在使用和不使用 defer()
操作符时的行为,了解这些操作符在不同场景中的交互方式,并通过实际示例来说明它们对响应式流(Reactive Stream)的影响。
2、switchIfEmpty() 和 Defer() 的使用
switchIfEmpty()
是 Mono 和 Flux 中的一个操作符,用于在源生产者为空时执行备用生产者流。如果主源 Publisher 没有发布数据,该操作符就会切换到替代源的数据发布。
考虑一个从大型文件中通过 ID 检索用户详细信息的场景。当请求文件中的用户详细信息时,遍历文件会消耗大量时间。因此,对于经常访问的 ID,将其详细信息缓存起来更有意义。
当端点收到请求时,首先搜索缓存。如果用户详细信息可用,返回响应。如果没有,则从文件中获取数据并缓存起来,以备后续请求。
在这种情况下,主数据提供者(Primary Data Provider)是检查缓存中 KEY 是否存在的流,而备用数据提供者是检查文件中的 KEY 并更新缓存的流。switchIfEmpty()
操作符可以根据缓存中数据的可用性高效地切换数据源提供者。
了解 defer()
操作符的使用也很重要,它可以推迟函数的求值,直到发生订阅。如果我们不在 switchIfEmpty()
中使用 defer()
操作符,表达式就会立即(急切地)求值,从而可能导致意想不到的副作用。
3、示例设置
通过示例来了解 switchIfEmpty()
操作符在不同情况下的行为。
3.1、Data Model
首先,定义一个 User
模型类,其中包含一些详细信息,如 id
、email
、username
和 roles
:
public class User {
@JsonProperty("id")
private String id;
@JsonProperty("email")
private String email;
@JsonProperty("username")
private String username;
@JsonProperty("roles")
private String roles;
// Getter / Setter 省略
}
3.2、用户数据设置
随后,在 classpath 中维护一个文件(users.json),其中包含 JSON 格式的所有用户详细信息:
[
{
"id": "66b296723881ea345705baf1",
"email": "may_barber@twiggery.cg",
"username": "reid90",
"roles": "member"
},
{
"id": "66b29672e6f99a7156cc4ada",
"email": "gwen_dodson@beadzza.bmw",
"username": "boyle94",
"roles": "admin"
},
...
]
3.3、实现 Controller 和 Service
接着,添加一个 Controller,按 ID 检索用户详细信息。它接受一个可选的布尔参数 withDefer
,并根据该查询参数使用不同的实现:
@GetMapping("/user/{id}")
public Mono<ResponseEntity<User>> findUserDetails(@PathVariable("id") String id,
@RequestParam("withDefer") boolean withDefer) {
return (withDefer ? userService.findByUserIdWithDefer(id) :
userService.findByUserIdWithoutDefer(id)).map(ResponseEntity::ok);
}
然后,在 UserService
中定义这两种实现,包括使用和不使用 defer()
,以了解 switchIfEmpty()
的行为:
public Mono<User> findByUserIdWithDefer(String id) {
return fetchFromCache(id).switchIfEmpty(Mono.defer(() -> fetchFromFile(id)));
}
public Mono<User> findByUserIdWithoutDefer(String id) {
return fetchFromCache(id).switchIfEmpty(fetchFromFile(id));
}
为简单起见,实现一个内存缓存来保留用户信息,作为请求的主要数据提供者。还记录了每次访问的日志,以确定是否命中缓存:
private final Map<String, User> usersCache;
private Mono<User> fetchFromCache(String id) {
User user = usersCache.get(id);
if (user != null) {
LOG.info("Fetched user {} from cache", id);
return Mono.just(user);
}
return Mono.empty();
}
然后,当 ID 不在缓存中时,我们要从文件中获取用户详细信息,并在找到数据时更新缓存:
private Mono<User> fetchFromFile(String id) {
try {
File file = new ClassPathResource("users.json").getFile();
String usersData = new String(Files.readAllBytes(file.toPath()));
List<User> users = objectMapper.readValue(usersData, new TypeReference<List<User>>() {
});
User user = users.stream()
.filter(u -> u.getId()
.equalsIgnoreCase(id))
.findFirst()
.get();
usersCache.put(user.getId(), user);
LOG.info("Fetched user {} from file", id);
return Mono.just(user);
} catch (IOException e) {
return Mono.error(e);
}
}
注意日志的细节,以确定是否从文件中获取了用户数据。
4、测试
在 @BeforeEach
测试方法中添加一个 ListAppender
来跟踪日志。用于确定是缓存还是文件执行了不同请求的函数:
protected ListAppender<ILoggingEvent> listAppender;
@BeforeEach
void setLogger() {
Logger logger = (Logger) LoggerFactory.getLogger(UserService.class);
logger.setLevel(Level.DEBUG);
listAppender = new ListAppender<>();
logger.addAppender(listAppender);
listAppender.start();
}
4.1、switchIfEmpty() 在非空数据源上使用 defer()
只有当请求中的 withDefer
参数设置为 true
时,才会从缓存中检索用户数据:
@Test
void givenUserDataIsAvailableInCache_whenUserByIdIsRequestedWithDeferParameter_thenCachedResponseShouldBeRetrieved() {
usersCache = new HashMap<>();
User cachedUser = new User("66b29672e6f99a7156cc4ada", "gwen_dodson@beadzza.bmw", "boyle94", "admin");
usersCache.put("66b29672e6f99a7156cc4ada", cachedUser);
userService.getUsers()
.putAll(usersCache);
webTestClient.get()
.uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=true")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," +
"\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");
assertTrue(listAppender.list.stream()
.anyMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
assertTrue(listAppender.list.stream()
.noneMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from file")));
}
将 switchIfEmpty()
与 defer()
操作符一起使用时,替代源 Provider 不会被急于评估。
4.2、switchIfEmpty() 在非空数据源上不使用 defer()
添加另一个测试,检查在不使用 defer()
操作符的情况下使用 switchIfEmpty()
的行为:
@Test
void givenUserDataIsAvailableInCache_whenUserByIdIsRequestedWithoutDeferParameter_thenUserIsFetchedFromFileInAdditionToCache() {
usersCache = new HashMap<>();
User cachedUser1 = new User("66b29672e6f99a7156cc4ada", "gwen_dodson@beadzza.bmw", "boyle94", "admin");
usersCache.put("66b29672e6f99a7156cc4ada", cachedUser1);
userService.getUsers().putAll(usersCache);
webTestClient.get()
.uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=false")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," +
"\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");
assertTrue(listAppender.list.stream()
.anyMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from file")));
assertTrue(listAppender.list.stream()
.anyMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}
如你所见,该实现从缓存和文件中获取了用户详细信息,但最终从缓存中提供了响应。尽管主要源(缓存)发出了数据,备用源中的代码块仍被不必要地触发。
4.3、switchIfEmpty() 在空源上使用 defer()
接下来,添加一个测试,以验证当缓存中没有数据时,特别是使用 defer()
操作符时,是否能从文件中检索到用户详细信息:
@Test
void givenUserDataIsNotAvailableInCache_whenUserByIdIsRequestedWithDeferParameter_thenFileResponseShouldBeRetrieved() {
webTestClient.get()
.uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=true")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"
,\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");
assertTrue(listAppender.list.stream()
.anyMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from file")));
assertTrue(listAppender.list.stream()
.noneMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}
API 会从文件中获取用户详细信息,而不是尝试从缓存中获取。
4.4、switchIfEmpty() 在空源上不使用 defer()
最后,添加一个测试,以验证在不使用 defer()
的情况下,实现是否能从文件中获取用户详细信息(即缓存中没有数据):
@Test
void givenUserDataIsNotAvailableInCache_whenUserByIdIsRequestedWithoutDeferParameter_thenFileResponseShouldBeRetrieved() {
webTestClient.get()
.uri("/api/v1/user/66b29672e6f99a7156cc4ada?withDefer=false")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"id\":\"66b29672e6f99a7156cc4ada\"," + "\"email\":\"gwen_dodson@beadzza.bmw\",\"username\":\"boyle94\",\"roles\":\"admin\"}");
assertTrue(listAppender.list.stream()
.anyMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from file")));
assertTrue(listAppender.list.stream()
.noneMatch(e -> e.toString()
.contains("Fetched user 66b29672e6f99a7156cc4ada from cache")));
}
由于缓存中没有数据,用户详细信息不会从缓存中获取(副作用就是仍会尝试获取),API 仍会按预期从文件中获取用户详细信息。
5、总结
本文通过测试重点了解了 Spring Reactive 中的 switchIfEmpty()
操作符及其各种行为。
将 switchIfEmpty()
与 defer()
结合使用,可确保只在必要时访问替代数据源。这样可以避免不必要的计算和潜在的副作用。
Ref:https://www.baeldung.com/spring-reactive-switchifempty