在 Spring 中使用 DeferredResult 实现长轮询

1、概览

长轮询(Long polling)通常用于在 B/S 架构的应用中保持客户端和服务器的连接,直到信息可用。通常在服务器必须调用下游服务以获取信息并等待结果时使用。

本文将带你了解如何在 Spring MVC 应用中使用 DeferredResult 实现长轮询,以及如何处理错误和超时。

2、使用 DeferredResult 进行长轮询

可以在 Spring MVC 中使用 DeferredResult 来异步处理入站 HTTP 请求。它允许释放 HTTP 工作线程来处理其他入站请求,并将工作转移到另一个工作线程。因此,它可以帮助处理需要较长计算或任意等待时间的请求,提高服务的可用性。

2.1、Publisher

使用 DeferredResult 创建一个 Publisher(发布者)应用。

首先,定义一个 Spring @RestController,它可以使用 DeferredResult,但不会将工作转移到另一个工作线程:

@RestController
@RequestMapping("/api")
public class BakeryController { 
    @GetMapping("/bake/{bakedGood}")
    public DeferredResult<String> publisher(@PathVariable String bakedGood, @RequestParam Integer bakeTime) {
        DeferredResult<String> output = new DeferredResult<>();
        try {
            Thread.sleep(bakeTime);
            output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
        } catch (Exception e) {
            // ...
        }
        return output;
    }
}

该 Controller 的同步工作方式与普通阻塞式 Controller 的工作方式相同。因此,HTTP 线程会被完全阻塞,直到 bakeTime 结束。如果服务有大量入站流量,这种情况并不理想。

现在,通过将工作转移到工作线程来异步设置输出:

private ExecutorService bakers = Executors.newFixedThreadPool(5);

@GetMapping("/bake/{bakedGood}")
public DeferredResult<String> publisher(@PathVariable String bakedGood, @RequestParam Integer bakeTime) {
    DeferredResult<String> output = new DeferredResult<>();
    bakers.execute(() -> {
        try {
            Thread.sleep(bakeTime);
            output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
        } catch (Exception e) {
            // ...
        }
    });
    return output;
}

在这个示例中,现在可以释放 HTTP 工作线程来处理其他请求。bakers 线程池中的一个工作线程将会执行工作,并在完成后设置结果。当工作线程调用 setResult 时,它将允许容器线程响应调用服务的客户端。

这种方式,相对于传统的阻塞式 Controller 来说,可以提供更好的服务可用性。然而,还需要处理一些边缘情况,例如错误和超时。

使用 DeferredResult 提供的 setErrorResult 方法处理 Worker(工作)线程抛出的受检异常:

bakers.execute(() -> {
    try {
        Thread.sleep(bakeTime);
        output.setResult(format("Bake for %s complete and order dispatched. Enjoy!", bakedGood));
     } catch (Exception e) {
        output.setErrorResult("Something went wrong with your order!");
     }
});

现在,工作线程可以优雅地处理抛出的任何异常。

由于执行长轮询通常是为了以异步或同步方式处理下游系统的响应,因此应该添加一种机制,在未收到下游系统响应的情况下强制超时。

为此,DeferredResult API 提供了一种机制。首先,在 DeferredResult 对象的构造函数中传递一个超时参数:

DeferredResult<String> output = new DeferredResult<>(5000L);

接着,使用 onTimeout 来处理超时:

output.onTimeout(() -> output.setErrorResult("the bakery is not responding in allowed time"));

它接受一个 Runnable 作为输入参数 - 当达到超时阈值时,容器线程会调用它。

如上,如果达到超时时间,则将其视为错误处理,并相应地调用 setErrorResult

2.2、Subscriber

Publisher 应用创建完毕后,现在创建一个 Subscriber(订阅者)应用。

开发调用这种长轮询 API 服务的客户端相当简单,因为它与编写标准阻塞式 REST 调用的客户端基本相同。唯一真正的区别是,由于长轮询的等待时间,需要确保有一个超时机制。

在 Spring MVC 中,可以使用 RestTemplateWebClient 来实现这一点,因为它们都内置了超时处理功能。

首先,从 RestTemplate 示例开始。

使用 RestTemplateBuilder 创建一个 RestTemplate 实例,设置超时时间:

public String callBakeWithRestTemplate(RestTemplateBuilder restTemplateBuilder) {
    RestTemplate restTemplate = restTemplateBuilder
      .setConnectTimeout(Duration.ofSeconds(10))
      .setReadTimeout(Duration.ofSeconds(10))
      .build();

    try {
        return restTemplate.getForObject("/api/bake/cookie?bakeTime=1000", String.class);
    } catch (ResourceAccessException e) {
        // 处理超时
    }
}

如上,捕获长轮询调用中的 ResourceAccessException 异常,处理超时错误。

接下来,使用 WebClient 创建一个示例来实现同样的结果:

public String callBakeWithWebClient() {
    WebClient webClient = WebClient.create();
    try {
        return webClient.get()
          .uri("/api/bake/cookie?bakeTime=1000")
          .retrieve()
          .bodyToFlux(String.class)
          .timeout(Duration.ofSeconds(10))
          .blockFirst();
    } catch (ReadTimeoutException e) {
        // 处理超时
    }
}

3、测试长轮询

现在,启动应用。

首先,使用 MockMvc 测试对 Controller 类的调用:

MvcResult asyncListener = mockMvc
  .perform(MockMvcRequestBuilders.get("/api/bake/cookie?bakeTime=1000"))
  .andExpect(request().asyncStarted())
  .andReturn();

如上,调用 DeferredResult 端点,并断言请求已开始异步调用。从这里开始,测试将等待异步结果的完成,这意味着不需要在测试中添加任何等待逻辑。

接下来,断言异步调用何时返回,并且与期望的值相匹配:

String response = mockMvc
  .perform(asyncDispatch(asyncListener))
  .andReturn()
  .getResponse()
  .getContentAsString();

assertThat(response)
  .isEqualTo("Bake for cookie complete and order dispatched. Enjoy!");

通过 asyncDispatch() 方法,可以获取异步调用的响应并断言其值。

为了测试 DeferredResult 的超时机制,需要稍微修改测试代码,在 asyncListenerresponse 调用之间添加一个 Timeout Enabler(超时启动器):

((MockAsyncContext) asyncListener
  .getRequest()
  .getAsyncContext())
  .getListeners()
  .get(0)
  .onTimeout(null);

这段代码可能看起来很奇怪,但这样调用 onTimeout 是有特殊原因的。这样做是为了让 AsyncListener 知道某个操作超时了。这将确保在 Controller 中为 onTimeout 方法设置的 Runnable 类被正确调用。

4、总结

本文介绍了如何在 Spring MVC 应用中使用 DeferredResult 实现长轮询,以及如何处理超时和异常。


Ref:https://www.baeldung.com/spring-mvc-long-polling