Spring Boot 定时推送 Websocket 消息
1、概览
本文将带你了解如何在 Spring Boot 中实现定时地往浏览器推送 WebSockets 消息。
另一种方法是使用服务器发送事件 (SSE),但本文不涉及这一点。
Spring 提供了多种调度方式。如 @Scheduled
注解,以及 Project Reactor 提供的 Flux::interval
方法,对于 Webflux 应用来说,该方法开箱即用,它还可以作为独立库用于任何 Java 项目。
此外,还有一些更专业的三方调度框架,如 Quartz Scheduler,但这不在本文范畴。
2、简单的聊天应用
在 上一篇文章 中,使用 WebSockets 构建了一个聊天应用。现在让我们用一项新功能来扩展它:聊天机器人。聊天机器人是向浏览器推送预定消息的服务器端组件。
2.1、Maven 依赖
先在 Maven 中添加必要依赖, pom.xml
如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
2.2、JavaFaker
使用 JavaFaker
库生成机器人信息。该库通常用于生成测试数据。在这里,用于为聊天室添加一位名为 “Chuck Norris” 的访客。
代码如下:
Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();
Faker 为各种数据生成器(Data Generator)提供工厂方法。本文使用 ChuckNorris
生成器。调用 chuckNorris.fact()
从预定义信息列表中随机显示一句话。
2.3、Data Model
使用简单的 POJO 封装聊天应用的消息:
public class OutputMessage {
private String from;
private String text;
private String time;
// 构造函数、get、set 方法省略
}
下面举例说明如何创建聊天消息:
OutputMessage message = new OutputMessage(
"Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));
2.4、客户端
聊天应用的客户端是一个简单的 HTML 页面。它使用 SockJS 客户端 和 STOMP 消息协议。
客户端订阅 Topic:
<html>
<head>
<script src="./js/sockjs-0.3.4.js"></script>
<script src="./js/stomp.js"></script>
<script type="text/javascript">
// ...
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
// ...
stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
showMessageOutput(JSON.parse(messageOutput.body));
});
});
// ...
</script>
</head>
<!-- ... -->
</html>
首先,通过 SockJS 协议创建一个 Stomp 客户端。然后,Topic 订阅作为服务器和连接的客户端之间的通信渠道。
在代码库中,这段代码位于 webapp/bots.html
文件中。当在本地运行时,可以通过http://localhost:8080/bots.html
访问它。当然,可以需要根据应用的部署方式来调整主机和端口。
2.5、服务端
上一篇文章 已经介绍了如何在 Spring 中配置 WebSockets。
现在来稍微修改一下配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// ...
registry.addEndpoint("/chatwithbots");
registry.addEndpoint("/chatwithbots").withSockJS();
}
}
使用工具类 SimpMessagingTemplate
来推送消息。默认情况下,它作为一个 @Bean
在 Spring Context 中提供。可以通过查看 classpath 上 AbstractMessageBrokerConfiguration
抽象类的自动配置来查看它的声明。
可以将它注入到任何 Spring 组件中。
然后,用它向 Topic /topic/pushmessages
推送消息。假设已将该 Bean 注入到名为 simpMessagingTemplate
的变量中:
simpMessagingTemplate.convertAndSend("/topic/pushmessages",
new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));
如前所述,在客户端示例中,客户端会订阅该 Topic,以便在消息到达时对其进行处理。
3. 定时推送消息
在 Spring 生态系统中,可以选择多种调度方法。如果使用 Spring MVC,@Scheduled
注解因其简单性而自然成为首选。如果使用 Spring Webflux,则使用 Project Reactor 的 Flux::interval
方法。
3.1、配置
聊天机器人使用 JavaFaker
的 Chuck Norris 生成器。
把它配置为一个 Bean,这样就可以在需要的地方注入它。
@Configuration
class AppConfig {
@Bean
public ChuckNorris chuckNorris() {
return (new Faker()).chuckNorris();
}
}
3.2、使用 @Scheduled
“机器人” 是一个定时调度方法。运行时,它使用 SimpMessagingTemplate
通过 WebSocket
发送 OutputMessage
POJO。
顾名思义,@Scheduled
注解允许重复执行方法。通过它,可以使用简单的基于速率的调度或更复杂的 cron
表达式。
编写第一个聊天机器人的代码:
@Service
public class ScheduledPushMessages {
@Scheduled(fixedRate = 5000)
public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
String time = new SimpleDateFormat("HH:mm").format(new Date());
simpMessagingTemplate.convertAndSend("/topic/pushmessages",
new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
}
}
用 @Scheduled(fixedRate = 5000)
对 sendMessage
方法进行注解。这使得 sendMessage
每五秒运行一次。然后,使用 simpMessagingTemplate
向 Topic 发送 OutputMessage
。simpMessagingTemplate
和 chuckNorris
实例作为方法参数从 Spring Context 注入。
3.3、使用 Flux::interval()
如果使用 WebFlux,可以使用 Flux::interval
Operator。它将发布一个指定时间间隔(Duration
)分隔的无限长的 Long
流。
现在,将 Flux 与之前的示例结合起来使用。
目标是每五秒钟发送一次 “Chuck Norris”。首先,需要实现 InitializingBean
接口,以便在应用启动时订阅 Flux
:
@Service
public class ReactiveScheduledPushMessages implements InitializingBean {
private SimpMessagingTemplate simpMessagingTemplate;
private ChuckNorris chuckNorris;
@Autowired
public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
this.simpMessagingTemplate = simpMessagingTemplate;
this.chuckNorris = chuckNorris;
}
@Override
public void afterPropertiesSet() throws Exception {
Flux.interval(Duration.ofSeconds(5L))
// 丢弃传入的 Long 消息,代之以 OutputMessage
.map((n) -> new OutputMessage("Chuck Norris (Flux::interval)",
chuckNorris.fact(),
new SimpleDateFormat("HH:mm").format(new Date())))
.subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
}
}
如上,使用构造函数注入来设置 simpMessagingTemplate
和 chuckNorris
实例。这次,调度逻辑位于 afterPropertiesSet()
中,在实现 InitializingBean
时覆写了该方法。该方法将在服务启动后立即运行。
interval
Operator 每 5 秒发出一个 Long
值。然后,map
Operator 会丢弃该值,并用我们的消息取而代之。最后,订阅 Flux
,为每条消息触发我们的逻辑。
4、总结
本文介绍了在 Spring Boot 中实现定时地往浏览器推送 WebSocket 消息的两种方式,分别是使用 @Scheduled
注解或者 Flux::interval()
方法。
Ref:https://www.baeldung.com/spring-boot-scheduled-websocket