使用 PostgreSQL 提供的 LISTEN/NOTIFY 机制实现事件驱动
1、简介
本文将带你了解 PostgreSQL 中的 LISTEN 和 NOTIFY 命令,包括其功能、使用方法以及在应用中的实际应用。
2、LISTEN 和 NOTIFY 是什么?
PostgreSQL 支持使用 LISTEN
和 NOTIFY
命令在服务器和连接的客户端之间进行异步通信。这些特定于 PostgreSQL 的扩展使我们能够将数据库用作一个简单的 MQ 系统,允许我们从数据库中生成客户端可以做出反应的事件。这在很多方面都很有用,如实时仪表盘、缓存失效、数据审计等。
2.1、监听通知
使用 LISTEN
命令注册接收事件的监听,需指定目标频道名称:
postgres=# LISTEN my_channel;
LISTEN
完成注册后,该连接即可接收该频道的异步事件通知。
所有注册监听的连接都会收到通知,因此该系统实际采用广播机制而非单播。这意味着可通过此方式轻松向所有客户端同步数据库内发生的事件。
注意:若使用 psql 工具,不会自动接收通知。需重新执行 LISTEN
命令,此时会显示自上次监听后触发的所有通知:
postgres=# LISTEN my_channel;
LISTEN
.....
postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, World!" received from server process with PID 66.
此处可见某连接触发了 Payload 为 “Hello, world!” 的事件,监听连接已收到通知。
虽然监听器数量无硬性上限,但每个监听器需保持数据库连接开启以接收通知,因此实际受限于最大连接数限制。此外,每个监听器都会占用资源,过多监听器可能导致性能问题。
2.2、发布通知
了解如何监听事件后,还需掌握如何触发事件。使用 NOTIFY
命令可触发事件,需指定频道名称和发送的消息:
postgres=# NOTIFY my_channel, 'Hello, World!';
NOTIFY
执行此命令后,所有预先执行过对应 LISTEN
命令的连接均可接收该事件,如前文所示。
Payload 为可选参数,若未提供或设为 NULL
,系统将视同空字符串处理。Payload 最大容量为 8,000 字节,若超限将报错且不会通知任何监听器。
通知是事务的一部分。这意味着若在活动事务中触发通知,系统将在事务提交后才会发送;若事务回滚,则通知根本不会发出。
2.3、动态消息
NOTIFY
命令要求消息必须明确指定。无法动态生成消息内容,包括简单的字符串拼接:
postgres=# NOTIFY my_channel, 'Hello, ' || 'World';
ERROR: syntax error at or near "||"
LINE 1: NOTIFY my_channel, 'Hello, ' || 'World!';
但可使用 pg_notify
函数生成通知,该函数支持任意形式的消息:
postgres=# SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
pg_notify
-----------
(1 row)
此时频道名称必须以字符串形式提供,Payload 则作为独立字符串传入。可通过任意方式构建这些字符串,包括使用 SQL 语句的结果。
2.4、通过触发器触发事件
除了手动执行语句触发事件外,还可让数据库自动触发。例如,注册在特定时机执行的 触发器函数,这些函数也能生成通知:
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $
BEGIN
PERFORM pg_notify('table_change', TG_TABLE_NAME);
RETURN NEW;
END;
$ LANGUAGE plpgsql;
CREATE TRIGGER table_change
AFTER INSERT OR UPDATE OR DELETE ON table_name
FOR EACH ROW EXECUTE PROCEDURE notify_table_change();
完成后,若 table_name
表发生增删改操作,该触发器将自动通过 table_change
频道发送包含变更表名的通知。
3、通过 JDBC 发布通知
通过 JDBC 触发通知的方式与前文所示完全相同。
首先需连接数据库,目前可使用官方驱动。将其添加至构建文件:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.6</version>
</dependency>
随后可正常创建连接:
Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "mysecretpassword");
建立连接后,可通过 NOTIFY
命令或 pg_notify()
函数触发通知:
try (Statement statement = connection.createStatement()) {
statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
}
如前所述,若需使用绑定参数等非纯字符串操作,必须改用 pg_notify
函数:
try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
statement.setString(1, "my_channel");
statement.setString(2, "Hello, pg_notify!");
statement.execute();
}
两种方式效果相同,系统均能按预期发布通知:
postgres=# postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, NOTIFY!" received from server process with PID 390.
Asynchronous notification "my_channel" with payload "Hello, pg_notify!" received from server process with PID 390.
如上,监听会话成功接收了 Java 代码触发的两条通知。
4、通过 JDBC 监听通知
通过 JDBC 触发通知虽然简单,但监听通知更为复杂。 接收数据库的异步消息并非 JDBC 规范的标准功能,因此需依赖驱动特定实现。
首先需执行 LISTEN
语句:
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN my_channel");
}
但要接收通知本身,需在原生 PGConnection
对象上调用 getNotifications()
方法。 这意味着首先需确保获取正确类型的连接(Connection
):
PGConnection pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
随后调用 getNotifications()
获取已接收的通知。需在循环中以适当频率轮询数据库:
while (!Thread.currentThread().isInterrupted()) {
PGNotification[] notifications = pgConnection.getNotifications(1000);
if (notifications != null) {
// TODO 处理通知
}
}
收到通知后可按需处理。但需注意:只有再次调用 getNotifications()
才能接收新通知,因此若需高效响应后续事件,必须保持该循环运行。
getNotifications()
有三种调用方式。最简单的是无参数版本,会立即返回所有待处理通知(但这不是推荐方案)。更推荐使用带超时参数(毫秒)的版本,线程将在此期间阻塞:
PGNotification[] notifications = pgConnection.getNotifications(100);
如上,调用将在超时时间到达或收到通知时立即返回(以先发生者为准)。
如果把超时值设置为 0
,将永久阻塞。这意味着仅在收到通知时才会返回。如果在专用的消费线程上运行此方法,可以简化管理逻辑,因为不再需要空闲等待。
5、通过 PGJDBC-NG 监听通知
若需实现无需轮询的通知接收,可使用替代驱动。
PGJDBC-NG 驱动 在兼容 PostgreSQL 的同时提供更高级功能,包括为通知注册回调的能力。
使用前需将其添加至构建配置:
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.9</version>
</dependency>
随后可正常创建连接,仅需将 URL 类型从 jdbc:postgresql
替换为 jdbc:pgsql
。例如:
Connection connection = DriverManager.getConnection("jdbc:pgsql://localhost:5432/postgres", "postgres", "mysecretpassword");
仍需在连接上执行 LISTEN
命令,方式与前文相同。但此次可注册监听器,在通知发生时接收回调。
为此需实现 PGNotificationListener
接口:
class Listener implements PGNotificationListener {
@Override
public void notification(int processId, String channelName, String payload) {
LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
channelName, payload, processId);
}
}
随后可将该实例注册到连接:
PGConnection pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
pgConnection.addNotificationListener(new Listener());
至此,只要连接保持活跃,无需轮询数据库即可自动接收实时通知:
10:34:03.104 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, NOTIFY!', PID=844
10:34:03.106 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, pg_notify!', PID=844
这种方式不仅更易于管理,还能提升应用效率,因为不再需要轮询数据库等待事件触发。
6、总结
本文简要介绍了 PostgreSQL 中的 LISTEN
和 NOTIFY
命令及其在 JDBC 连接中的使用方法。下次需要从数据库触发事件时,不妨尝试此方案。
Ref:https://www.baeldung.com/java-postgresql-listen-notify-events