在 Spring 中使用 DeferredResult 实现长轮询
1、概览
长轮询(Long polling)通常用于在 B/S 架构的应用中保持客户端和服务器的连接,直到信息可用。通常在服务器必须调用下游服务以获取信息并等待结果时使用。
本文将带你了解如何在 Spring MVC 应用中使用 DeferredResult
实现长轮询,以及如何处理错误和超时。
2、使用 DeferredResult 进行长轮询
可以在 Spring MVC 中使用 DeferredResult
来异步处理入站 HTTP 请求。它允许释放 HTTP 工作线程来处理其他入站请求,并将工作转移到另一个工作线程。因此,它可以帮助处理需要较长计算或任意等待时间的请求,提高服务的可用性。
2.1、Publisher
使用 DeferredResult
创建一个 Publisher(发布者)应用。
首先,定义一个 Spring @RestController
,它可以使用 DeferredResult
,但不会将工作转移到另一个工作线程:
@RestController
@RequestMapping("/api")
public class BakeryController {
@GetMapping("/bake/{bakedGood}")
public DeferredResult<String> publisher(@PathVariable String bakedGood, @RequestParam Integer bakeTime) {
DeferredResult<String> output = new DeferredResult<>();
try {
Thread.sleep(bakeTime);
output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
} catch (Exception e) {
// ...
}
return output;
}
}
该 Controller 的同步工作方式与普通阻塞式 Controller 的工作方式相同。因此,HTTP 线程会被完全阻塞,直到 bakeTime
结束。如果服务有大量入站流量,这种情况并不理想。
现在,通过将工作转移到工作线程来异步设置输出:
private ExecutorService bakers = Executors.newFixedThreadPool(5);
@GetMapping("/bake/{bakedGood}")
public DeferredResult<String> publisher(@PathVariable String bakedGood, @RequestParam Integer bakeTime) {
DeferredResult<String> output = new DeferredResult<>();
bakers.execute(() -> {
try {
Thread.sleep(bakeTime);
output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
} catch (Exception e) {
// ...
}
});
return output;
}
在这个示例中,现在可以释放 HTTP 工作线程来处理其他请求。bakers
线程池中的一个工作线程将会执行工作,并在完成后设置结果。当工作线程调用 setResult
时,它将允许容器线程响应调用服务的客户端。
这种方式,相对于传统的阻塞式 Controller 来说,可以提供更好的服务可用性。然而,还需要处理一些边缘情况,例如错误和超时。
使用 DeferredResult
提供的 setErrorResult
方法处理 Worker(工作)线程抛出的受检异常:
bakers.execute(() -> {
try {
Thread.sleep(bakeTime);
output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
} catch (Exception e) {
output.setErrorResult("Something went wrong with your order!");
}
});
现在,工作线程可以优雅地处理抛出的任何异常。
由于执行长轮询通常是为了以异步或同步方式处理下游系统的响应,因此应该添加一种机制,在未收到下游系统响应的情况下强制超时。
为此,DeferredResult
API 提供了一种机制。首先,在 DeferredResult
对象的构造函数中传递一个超时参数:
DeferredResult<String> output = new DeferredResult<>(5000L);
接着,使用 onTimeout
来处理超时:
output.onTimeout(() -> output.setErrorResult("the bakery is not responding in allowed time"));
它接受一个 Runnable
作为输入参数 - 当达到超时阈值时,容器线程会调用它。
如上,如果达到超时时间,则将其视为错误处理,并相应地调用 setErrorResult
。
2.2、Subscriber
Publisher 应用创建完毕后,现在创建一个 Subscriber(订阅者)应用。
开发调用这种长轮询 API 服务的客户端相当简单,因为它与编写标准阻塞式 REST 调用的客户端基本相同。唯一真正的区别是,由于长轮询的等待时间,需要确保有一个超时机制。
在 Spring MVC 中,可以使用 RestTemplate
或 WebClient
来实现这一点,因为它们都内置了超时处理功能。
首先,从 RestTemplate
示例开始。
使用 RestTemplateBuilder
创建一个 RestTemplate
实例,设置超时时间:
public String callBakeWithRestTemplate(RestTemplateBuilder restTemplateBuilder) {
RestTemplate restTemplate = restTemplateBuilder
.setConnectTimeout(Duration.ofSeconds(10))
.setReadTimeout(Duration.ofSeconds(10))
.build();
try {
return restTemplate.getForObject("/api/bake/cookie?bakeTime=1000", String.class);
} catch (ResourceAccessException e) {
// 处理超时
}
}
如上,捕获长轮询调用中的 ResourceAccessException
异常,处理超时错误。
接下来,使用 WebClient
创建一个示例来实现同样的结果:
public String callBakeWithWebClient() {
WebClient webClient = WebClient.create();
try {
return webClient.get()
.uri("/api/bake/cookie?bakeTime=1000")
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(10))
.blockFirst();
} catch (ReadTimeoutException e) {
// 处理超时
}
}
3、测试长轮询
现在,启动应用。
首先,使用 MockMvc
测试对 Controller 类的调用:
MvcResult asyncListener = mockMvc
.perform(MockMvcRequestBuilders.get("/api/bake/cookie?bakeTime=1000"))
.andExpect(request().asyncStarted())
.andReturn();
如上,调用 DeferredResult
端点,并断言请求已开始异步调用。从这里开始,测试将等待异步结果的完成,这意味着不需要在测试中添加任何等待逻辑。
接下来,断言异步调用何时返回,并且与期望的值相匹配:
String response = mockMvc
.perform(asyncDispatch(asyncListener))
.andReturn()
.getResponse()
.getContentAsString();
assertThat(response)
.isEqualTo("Bake for cookie complete and order dispatched. Enjoy!");
通过 asyncDispatch()
方法,可以获取异步调用的响应并断言其值。
为了测试 DeferredResult
的超时机制,需要稍微修改测试代码,在 asyncListener
和 response
调用之间添加一个 Timeout Enabler(超时启动器):
((MockAsyncContext) asyncListener
.getRequest()
.getAsyncContext())
.getListeners()
.get(0)
.onTimeout(null);
这段代码可能看起来很奇怪,但这样调用 onTimeout
是有特殊原因的。这样做是为了让 AsyncListener
知道某个操作超时了。这将确保在 Controller 中为 onTimeout
方法设置的 Runnable
类被正确调用。
4、总结
本文介绍了如何在 Spring MVC 应用中使用 DeferredResult
实现长轮询,以及如何处理超时和异常。
Ref:https://www.baeldung.com/spring-mvc-long-polling