在 Spring Boot 应用中使用 Postgresql 作为 Message Broker
1、简介
在本教程中,我们将介绍如何使用 PostgreSQL 的 LISTEN
/ NOTIFY
命令来在 Spring Boot 应用中实现简单的 MQ。
2、PostgreSQL 的 LISTEN/NOTIFY 机制简介
简单地说,这些命令允许连接的客户端通过普通的 PostgreSQL 连接交换信息。客户端使用 NOTIFY
命令向 channel 发送通知以及可选的 string payload。
channel 可以是任何有效的 SQL 标识符,其工作原理与传统 MQ 系统中的 topic 类似。这意味着 payload 将发送给该特定 channel 的所有活动的监听器(listener)。如果没有 payload,监听者只会收到一个空通知。
要开始接收通知,客户端需要使用 LISTEN
命令,该命令将 channel 名称作为唯一参数。该命令会立即返回,因此客户端可以使用同一连接继续执行其他任务。
通知机制具有一些重要的特性:
- channel 名称在数据库中是唯一的。
- 客户端使用 LISTEN/NOTIFY 时无需特殊授权。
- 在事务中使用 NOTIFY 时,客户端只有在事务成功完成时才会收到通知。
此外,如果在一个事务中使用相同的 payload 向同一 channel 发送多个 NOTIFY 命令,客户端将只收到一个通知。
3、使用 PostgreSQL 作为 Message Broker 的场景
鉴于 PostgreSQL 通知的特性,我们不禁要问,什么时候使用它而不是 RabbitMQ 或类似的成熟 message broker。这需要权衡利弊。一般来说,选择后者意味着:
- 更复杂 - message broker 是另一个必须监控、升级的组件。
- 处理分布式事务中的失败模式(failure mode)。
通知机制不存在这些问题:
- 假设我们使用 PostgreSQL 作为主数据库,这些功能已经预制。
- 无分布式事务。
当然,这也有局限性:
- 这是一种专有机制,只能用于 PostgreSQL。
- 不直接支持持久化订阅者。在客户端开始监听消息之前发送的通知将丢失。
即使有这些限制,这一机制仍有一些潜在的应用:
- “模块化单体”式应用中的通知总线。
- 分布式缓存失效。
- 轻量级消息代理,使用普通数据库表作为队列。
- 事件源架构。
4、在 Spring Boot 应用中使用 LISTEN/NOTIFY
既然我们已经对 LISTEN/NOTIFY 机制有了基本的了解,下面就让我们继续使用它构建一个简单的 Spring Boot 测试应用。我们将创建一个简单的 API,允许我们提交买入/卖出订单。payload 包括我们愿意买入或卖出的仪器id、价格和数量。我们还将添加一个 API,允许我们根据订单的 id 查询订单。
到目前为止,没有什么特别之处。但问题是:我们希望在将订单查询插入数据库后立即从缓存中为其提供检索服务。当然,我们可以进行缓存写入,但在需要扩展服务的分布式场景中,我们还需要一个分布式缓存。
这时,通知机制就派上用场了:我们将在每次插入时发送 NOTIFY,客户端将使用 LISTEN 将订单预加载到各自的本地缓存中。
4.1、项目依赖
我们的示例应用需要 Spring Boot 应用的常规依赖项以及 PostgreSQL 驱动:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
spring-boot-starter-web 、spring-boot-starter-data-jdbc 和 postgresql 的最新版本可从 Maven Central 获取。
4.2、Notification Service
由于通知机制是 PostgreSQL 特有的,我们将把它的一般行为封装在一个类中: NotifierService
。这样做可以避免这些细节泄漏到应用的其他部分。这也简化了单元测试,因为我们可以用模拟的版本替换该服务,以实现不同的场景。
NotifierService
有两个职责。首先,它提供了发送订单相关通知的 facade(外观设计模式):
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
@Transactional
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
}
// ... other methods omitted
}
其次,它有一个用于 Runnable
实例的工厂方法,应用可使用该方法接收通知。该工厂需要一个 PGNotification
对象的消费者,其中包含检索与通知相关的 channel 和 payload 的方法:
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
continue;
}
for( PGNotification nt : nts) {
consumer.accept(nt);
}
}
return 0;
});
};
}
在这里,为了简单起见,我们选择交付原始的 PGNotification
。在现实世界中,我们通常要处理多个 domain entity,我们可以使用泛型或类似技术来继承该类,以避免代码重复。
关于创建的 Runnable
的几点说明:
- 与数据库相关的逻辑使用
JdbcTemplate
所提供的execute()
方法。这可确保正确的连接处理/清理,并简化错误处理。 - 回调一直运行,直到当前线程被中断或运行时出错导致它返回。
请注意使用的是 PGConnection
,而不是标准的 JDBC Connection
。我们需要它来直接访问 getNotifications()
方法,该方法会返回一个或多个队列通知。
getNotifications()
有两个重载方法。在不带参数的情况下调用时,它会轮询并返回任何待处理的通知。如果没有,则返回 null
。第二个变量接受一个 integer,表示等待通知的最长时间,超时返回 null
。如果我们传递 0
(零)作为超时值则不超时,getNotifications()
就会阻塞,直到有新的通知到达。
在应用初始化过程中,我们在 @Configuration
类中使用了 CommandLineRunner
Bean,该 Bean 实际上将生成一个新的线程以开始接收通知:
@Configuration
public class ListenerConfiguration {
@Bean
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
t.start();
};
}
}
4.3、处理连接
使用同一连接处理通知和常规查询虽然技术上可行,但并不方便。这必须在控制流中分散调用 getNotification()
,将导致代码难以阅读和维护。
标准做法是运行一个或多个专用线程来处理通知。每个线程都有自己的连接,并一直保持打开状态。如果这些连接是由 Hikari 或 DBCP 等连接池创建的,这可能会造成问题。
为了避免这些问题,我们的示例创建了一个专用的 DriverDataSource
,反过来,我们用它来创建 NotifierService
所需的 JdbcTemplate
:
@Configuration
public class NotifierConfiguration {
@Bean
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
props.determineUrl(),
props.determineDriverClassName(),
new Properties(),
props.determineUsername(),
props.determinePassword());
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
}
}
请注意,我们共享了用于创建 Spring 管理的 main DataSource
的相同连接属性。不过,我们没有将此专用 DataSource
作为 bean 公开,因为这样会禁用 Spring Boot 的自动配置功能。
4.4、处理通知
缓存逻辑的最后一部分是 NotificationHandler
类,它实现了 Consumer<Notification>
接口。该类的作用是处理单个通知,并用 Order
实例填充到已配置的 Cache
中。
@Component
public class NotificationHandler implements Consumer<PGNotification> {
private final OrdersService orders;
@Override
public void accept(PGNotification t) {
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
// ... log messages omitted
}
}
该实现使用 getName()
和 getParameter()
从通知中获取 channel 名称和 order id。在此,我们可以假设通知始终是预期的通知。
实际逻辑非常简单:我们使用 OrderRepository
从数据库中获取 Order
并将其添加到缓存中:
@Service
public class OrdersService {
private final OrdersRepository repo;
// ... other private fields omitted
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if (!o.isEmpty()) {
log.info("findById: cache hit, id={}",id);
return o;
}
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
}
ordersCache.put(id, o.get());
return o;
}
}
5、测试
要查看通知机制的运行情况,最好的办法是启动两个或多个测试应用实例,每个实例都配置为监听不同的端口。我们还需要两个实例都能连接的 PostgreSQL
实例。请参考 application.properties 文件,并根据 PostgreSQL 实例的连接详情对其进行修改。
接下来,为了启动测试环境,我们将打开两个 shell 并使用 Maven 运行应用。项目的 pom.xml
包含一个额外的配置文件 instance1
,它将在不同的端口上启动应用:
# 第一个 shell
$ mvn spring-boot:run
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 2.615 seconds (JVM running for 2.944)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
## 第二个 shell
... many messages (omitted)
[ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
[ restartedMain] c.b.messaging.postgresql.Application : Started Application in 1.984 seconds (JVM running for 2.274)
[ restartedMain] c.b.m.p.config.ListenerConfiguration : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService : notificationHandler: sending LISTEN command...
过一段时间后,我们应该在每个日志中看到一条消息,通知我们应用已准备好接收请求。现在,让我们在另一个 shell 上使用 curl
创建第一个 Order:
$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}
在 8080 端口运行的应用实例将打印一些信息。我们还将看到 8081
实例日志显示它收到了一条通知:
[ order-listener] c.b.m.p.service.NotificationHandler : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)
这证明该机制按预期运行。最后,我们可以再次使用 curl
来查询在 instance1
上创建的订单:
curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}
不出所料,我们得到了订单详细信息。此外,应用日志还显示该信息来自缓存:
[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService : findById: cache hit, id=30
6、总结
在本文中,我们介绍了 PostgreSQL 的 NOTIFY/LISTEN 机制,以及如何使用它来实现一个无需额外组件的轻量级 message broker。
参考:https://www.baeldung.com/spring-postgresql-message-broker