Spring Boot 定时推送 Websocket 消息

1、概览

本文将带你了解如何在 Spring Boot 中实现定时地往浏览器推送 WebSockets 消息。

另一种方法是使用服务器发送事件 (SSE),但本文不涉及这一点。

Spring 提供了多种调度方式。如 @Scheduled 注解,以及 Project Reactor 提供的 Flux::interval 方法,对于 Webflux 应用来说,该方法开箱即用,它还可以作为独立库用于任何 Java 项目。

此外,还有一些更专业的三方调度框架,如 Quartz Scheduler,但这不在本文范畴。

2、简单的聊天应用

上一篇文章 中,使用 WebSockets 构建了一个聊天应用。现在让我们用一项新功能来扩展它:聊天机器人。聊天机器人是向浏览器推送预定消息的服务器端组件。

2.1、Maven 依赖

先在 Maven 中添加必要依赖, pom.xml 如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>com.github.javafaker</groupId>
    <artifactId>javafaker</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

2.2、JavaFaker

使用 JavaFaker 库生成机器人信息。该库通常用于生成测试数据。在这里,用于为聊天室添加一位名为 “Chuck Norris” 的访客。

代码如下:

Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();

Faker 为各种数据生成器(Data Generator)提供工厂方法。本文使用 ChuckNorris 生成器。调用 chuckNorris.fact() 从预定义信息列表中随机显示一句话。

2.3、Data Model

使用简单的 POJO 封装聊天应用的消息:

public class OutputMessage {

    private String from;
    private String text;
    private String time;

   // 构造函数、get、set 方法省略
}

下面举例说明如何创建聊天消息:

OutputMessage message = new OutputMessage(
  "Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));

2.4、客户端

聊天应用的客户端是一个简单的 HTML 页面。它使用 SockJS 客户端STOMP 消息协议。

客户端订阅 Topic:

<html>
<head>
    <script src="./js/sockjs-0.3.4.js"></script>
    <script src="./js/stomp.js"></script>
    <script type="text/javascript">
        // ...
        stompClient = Stomp.over(socket);

        stompClient.connect({}, function(frame) {
            // ...
            stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
                showMessageOutput(JSON.parse(messageOutput.body));
            });
        });
        // ...
    </script>
</head>
<!-- ... -->
</html>

首先,通过 SockJS 协议创建一个 Stomp 客户端。然后,Topic 订阅作为服务器和连接的客户端之间的通信渠道。

在代码库中,这段代码位于 webapp/bots.html 文件中。当在本地运行时,可以通过http://localhost:8080/bots.html 访问它。当然,可以需要根据应用的部署方式来调整主机和端口。

2.5、服务端

上一篇文章 已经介绍了如何在 Spring 中配置 WebSockets。

现在来稍微修改一下配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // ...
        registry.addEndpoint("/chatwithbots");
        registry.addEndpoint("/chatwithbots").withSockJS();
    }
}

使用工具类 SimpMessagingTemplate 来推送消息。默认情况下,它作为一个 @Bean 在 Spring Context 中提供。可以通过查看 classpath 上 AbstractMessageBrokerConfiguration 抽象类的自动配置来查看它的声明。

可以将它注入到任何 Spring 组件中。

然后,用它向 Topic /topic/pushmessages 推送消息。假设已将该 Bean 注入到名为 simpMessagingTemplate 的变量中:

simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
  new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));

如前所述,在客户端示例中,客户端会订阅该 Topic,以便在消息到达时对其进行处理。

3. 定时推送消息

在 Spring 生态系统中,可以选择多种调度方法。如果使用 Spring MVC,@Scheduled 注解因其简单性而自然成为首选。如果使用 Spring Webflux,则使用 Project Reactor 的 Flux::interval 方法。

3.1、配置

聊天机器人使用 JavaFaker 的 Chuck Norris 生成器。

把它配置为一个 Bean,这样就可以在需要的地方注入它。

@Configuration
class AppConfig {

    @Bean
    public ChuckNorris chuckNorris() {
        return (new Faker()).chuckNorris();
    }
}

3.2、使用 @Scheduled

“机器人” 是一个定时调度方法。运行时,它使用 SimpMessagingTemplate 通过 WebSocket 发送 OutputMessage POJO。

顾名思义,@Scheduled 注解允许重复执行方法。通过它,可以使用简单的基于速率的调度或更复杂的 cron 表达式。

编写第一个聊天机器人的代码:

@Service
public class ScheduledPushMessages {

    @Scheduled(fixedRate = 5000)
    public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        String time = new SimpleDateFormat("HH:mm").format(new Date());
        simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
          new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
    }
    
}

@Scheduled(fixedRate = 5000)sendMessage 方法进行注解。这使得 sendMessage 每五秒运行一次。然后,使用 simpMessagingTemplate 向 Topic 发送 OutputMessagesimpMessagingTemplatechuckNorris 实例作为方法参数从 Spring Context 注入。

3.3、使用 Flux::interval()

如果使用 WebFlux,可以使用 Flux::interval Operator。它将发布一个指定时间间隔(Duration)分隔的无限长的 Long 流。

现在,将 Flux 与之前的示例结合起来使用。

目标是每五秒钟发送一次 “Chuck Norris”。首先,需要实现 InitializingBean 接口,以便在应用启动时订阅 Flux

@Service
public class ReactiveScheduledPushMessages implements InitializingBean {

    private SimpMessagingTemplate simpMessagingTemplate;

    private ChuckNorris chuckNorris;

    @Autowired
    public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.chuckNorris = chuckNorris;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Flux.interval(Duration.ofSeconds(5L))
            // 丢弃传入的 Long 消息,代之以 OutputMessage
            .map((n) -> new OutputMessage("Chuck Norris (Flux::interval)", 
                              chuckNorris.fact(), 
                              new SimpleDateFormat("HH:mm").format(new Date()))) 
            .subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
    }
}

如上,使用构造函数注入来设置 simpMessagingTemplatechuckNorris 实例。这次,调度逻辑位于 afterPropertiesSet() 中,在实现 InitializingBean 时覆写了该方法。该方法将在服务启动后立即运行。

interval Operator 每 5 秒发出一个 Long 值。然后,map Operator 会丢弃该值,并用我们的消息取而代之。最后,订阅 Flux,为每条消息触发我们的逻辑。

4、总结

本文介绍了在 Spring Boot 中实现定时地往浏览器推送 WebSocket 消息的两种方式,分别是使用 @Scheduled 注解或者 Flux::interval() 方法。


Ref:https://www.baeldung.com/spring-boot-scheduled-websocket