使用 PostgreSQL 提供的 LISTEN/NOTIFY 机制实现事件驱动

1、简介

本文将带你了解 PostgreSQL 中的 LISTENNOTIFY 命令,包括其功能、使用方法以及在应用中的实际应用。

2、LISTEN 和 NOTIFY 是什么?

PostgreSQL 支持使用 LISTENNOTIFY 命令在服务器和连接的客户端之间进行异步通信。这些特定于 PostgreSQL 的扩展使我们能够将数据库用作一个简单的 MQ 系统,允许我们从数据库中生成客户端可以做出反应的事件。这在很多方面都很有用,如实时仪表盘、缓存失效、数据审计等。

2.1、监听通知

使用 LISTEN 命令注册接收事件的监听,需指定目标频道名称:

postgres=# LISTEN my_channel;
LISTEN

完成注册后,该连接即可接收该频道的异步事件通知。

所有注册监听的连接都会收到通知,因此该系统实际采用广播机制而非单播。这意味着可通过此方式轻松向所有客户端同步数据库内发生的事件。

注意:若使用 psql 工具,不会自动接收通知。需重新执行 LISTEN 命令,此时会显示自上次监听后触发的所有通知:

postgres=# LISTEN my_channel;
LISTEN
.....
postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, World!" received from server process with PID 66.

此处可见某连接触发了 Payload 为 “Hello, world!” 的事件,监听连接已收到通知。

虽然监听器数量无硬性上限,但每个监听器需保持数据库连接开启以接收通知,因此实际受限于最大连接数限制。此外,每个监听器都会占用资源,过多监听器可能导致性能问题。

2.2、发布通知

了解如何监听事件后,还需掌握如何触发事件。使用 NOTIFY 命令可触发事件,需指定频道名称和发送的消息:

postgres=# NOTIFY my_channel, 'Hello, World!';
NOTIFY

执行此命令后,所有预先执行过对应 LISTEN 命令的连接均可接收该事件,如前文所示。

Payload 为可选参数,若未提供或设为 NULL,系统将视同空字符串处理。Payload 最大容量为 8,000 字节,若超限将报错且不会通知任何监听器。

通知是事务的一部分。这意味着若在活动事务中触发通知,系统将在事务提交后才会发送;若事务回滚,则通知根本不会发出。

2.3、动态消息

NOTIFY 命令要求消息必须明确指定。无法动态生成消息内容,包括简单的字符串拼接:

postgres=# NOTIFY my_channel, 'Hello, ' || 'World';
ERROR:  syntax error at or near "||"
LINE 1: NOTIFY my_channel, 'Hello, ' || 'World!';

但可使用 pg_notify 函数生成通知,该函数支持任意形式的消息:

postgres=# SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
 pg_notify
-----------

(1 row)

此时频道名称必须以字符串形式提供,Payload 则作为独立字符串传入。可通过任意方式构建这些字符串,包括使用 SQL 语句的结果。

2.4、通过触发器触发事件

除了手动执行语句触发事件外,还可让数据库自动触发。例如,注册在特定时机执行的 触发器函数,这些函数也能生成通知:

CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $
    BEGIN
        PERFORM pg_notify('table_change', TG_TABLE_NAME);
        RETURN NEW;
    END;
$ LANGUAGE plpgsql;

CREATE TRIGGER table_change 
    AFTER INSERT OR UPDATE OR DELETE ON table_name
    FOR EACH ROW EXECUTE PROCEDURE notify_table_change();

完成后,若 table_name 表发生增删改操作,该触发器将自动通过 table_change 频道发送包含变更表名的通知。

3、通过 JDBC 发布通知

通过 JDBC 触发通知的方式与前文所示完全相同。

首先需连接数据库,目前可使用官方驱动。将其添加至构建文件:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.7.6</version>
</dependency>

随后可正常创建连接:

Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "mysecretpassword");

建立连接后,可通过 NOTIFY 命令或 pg_notify() 函数触发通知:

try (Statement statement = connection.createStatement()) {
    statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
}

如前所述,若需使用绑定参数等非纯字符串操作,必须改用 pg_notify 函数:

try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
    statement.setString(1, "my_channel");
    statement.setString(2, "Hello, pg_notify!");
    statement.execute();
}

两种方式效果相同,系统均能按预期发布通知:

postgres=# postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, NOTIFY!" received from server process with PID 390.
Asynchronous notification "my_channel" with payload "Hello, pg_notify!" received from server process with PID 390.

如上,监听会话成功接收了 Java 代码触发的两条通知。

4、通过 JDBC 监听通知

通过 JDBC 触发通知虽然简单,但监听通知更为复杂。 接收数据库的异步消息并非 JDBC 规范的标准功能,因此需依赖驱动特定实现。

首先需执行 LISTEN 语句:

try (Statement statement = connection.createStatement()) {
    statement.execute("LISTEN my_channel");
}

但要接收通知本身,需在原生 PGConnection 对象上调用 getNotifications() 方法。 这意味着首先需确保获取正确类型的连接(Connection):

PGConnection pgConnection = connection.unwrap(org.postgresql.PGConnection.class);

随后调用 getNotifications() 获取已接收的通知。需在循环中以适当频率轮询数据库:

while (!Thread.currentThread().isInterrupted()) {
    PGNotification[] notifications = pgConnection.getNotifications(1000);
    if (notifications != null) {
        // TODO 处理通知
    }
}

收到通知后可按需处理。但需注意:只有再次调用 getNotifications() 才能接收新通知,因此若需高效响应后续事件,必须保持该循环运行。

getNotifications() 有三种调用方式。最简单的是无参数版本,会立即返回所有待处理通知(但这不是推荐方案)。更推荐使用带超时参数(毫秒)的版本,线程将在此期间阻塞:

PGNotification[] notifications = pgConnection.getNotifications(100);

如上,调用将在超时时间到达或收到通知时立即返回(以先发生者为准)。

如果把超时值设置为 0,将永久阻塞。这意味着仅在收到通知时才会返回。如果在专用的消费线程上运行此方法,可以简化管理逻辑,因为不再需要空闲等待。

5、通过 PGJDBC-NG 监听通知

若需实现无需轮询的通知接收,可使用替代驱动。

PGJDBC-NG 驱动 在兼容 PostgreSQL 的同时提供更高级功能,包括为通知注册回调的能力。

使用前需将其添加至构建配置:

<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.9</version>
</dependency>

随后可正常创建连接,仅需将 URL 类型从 jdbc:postgresql 替换为 jdbc:pgsql。例如:

Connection connection = DriverManager.getConnection("jdbc:pgsql://localhost:5432/postgres", "postgres", "mysecretpassword");

仍需在连接上执行 LISTEN 命令,方式与前文相同。但此次可注册监听器,在通知发生时接收回调。

为此需实现 PGNotificationListener 接口:

class Listener implements PGNotificationListener {
    @Override
    public void notification(int processId, String channelName, String payload) {
        LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
                channelName, payload, processId);
    }
}

随后可将该实例注册到连接:

PGConnection pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
pgConnection.addNotificationListener(new Listener());

至此,只要连接保持活跃,无需轮询数据库即可自动接收实时通知:

10:34:03.104 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, NOTIFY!', PID=844
10:34:03.106 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, pg_notify!', PID=844

这种方式不仅更易于管理,还能提升应用效率,因为不再需要轮询数据库等待事件触发。

6、总结

本文简要介绍了 PostgreSQL 中的 LISTENNOTIFY 命令及其在 JDBC 连接中的使用方法。下次需要从数据库触发事件时,不妨尝试此方案。


Ref:https://www.baeldung.com/java-postgresql-listen-notify-events