Spring Boot 使用 Redis 实现可靠的消息队列

在应用中把 Redis 当成消息队列来使用已经屡见不鲜了。我想主要原因是当代应用十有八九都会用到 Redis,因此不用再引入其他消息队列系统。而且 Redis 提供了好几种实现消息队列的方法,用起来也简单。

使用 Redis 实现消息队列的几种方式

Redis 提供了多种方式来实现消息队列。

Pub/Sub

订阅发布模式,发布者把消息发布到某个 Channel,该 Channel 的所有订阅者都会收到消息。但是这种方式最大的问题是 发布出去的消息,如果没有被监听消费,或者消费过程中宕机,那么消息就会永远丢失。适合用于临时通知之类的场景,对于需要保证数据不丢失的场景不能使用这种方式。

List

List 是 Redis 提供的一种数据类型,底层是链表,可以用来实现队列、栈。

Stream

Stream 是一个由 Redis 5 引入的,功能完善的消息队列。想必也是 Redis 官方团队看到太多人拿 Redis 当消息队列使,于是干脆就在 Redis 上设计出一个类似于 Kafka 的消息队列。

Steam 支持消费组消费,一条消息只能被消费组中的其中一个消费者消费。支持 消息确认、支持 回溯消费 还支持把未 ACK(确认)的消息转移给其他消费者进行重新消费,在进行转移的时候还会累计消息的转移次数,当次数达到一定阈值还没消费成功,就可以放入死信队列。

这也是 Redis 种最复杂的一种数据类型。如果你真的到了需要使用 Redis Steam 作为消息队列的地步,那不如直接使用 RabbitMQ 等更加成熟且稳定的消息队列系统。

使用 List 实现可靠的消息队列

目前来说,这是用得最多的一种方式,适用于大多数简单的消息队列应用场景。List 类型有很多指令,但是作为消息队列来说用到的只有几个个:

LPUSH key element [element ...]

把元素插入到 List 的首部,如果 List 不存在,会自动创建。

BRPOPLPUSH source destination timeout

移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。

source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。

注意,这个命令是 原子性 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!

RPOPLPUSH source destination

同上,它是 BRPOPLPUSH 命令的 非阻塞 版,如果 List 中没有元素就会立即返回 null

LREM key count element

从 List 中删除元素,count 的值不同,删除的方式也不同:

  • count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。
  • count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。
  • count = 0:删除所有与元素相等的元素。

BLMOVELMOVE 命令

  • LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>
  • BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout

Redis 6.2.0 开始,BRPOPLPUSHRPOPLPUSH 命令就被声明为废弃了,取而代之的是语义更加明确的 BLMOVELMOVE 命令。

BLMOVELMOVE 可以通过参数指定元素出队列(source)的方向,和入队列(destination)的方向,除此以外并无其他区别。

实现思路

了解了上述几个命令后,一个简单易用且可靠的消息队列就呼之欲出了。

  1. 生产者使用 LPUSH 命令往消息队列生产消息
  2. 消费者使用 BRPOPLPUSH 命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息
  3. 消费者成功消费完毕后,使用 LREM 命令从 Pending 队列中删除这条消息,整个消费过程结束
  4. 如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)

在 Spring Boot 中实现

首先,创建 Spring Boot 项目,并整合 Redis。关于如何在 Spring Boot 中整合使用 Redis,请参阅 这篇文章

创建一个 OrderConsumer Bean 模拟从队列中消费订单 ID。

package cn.springdoc.demo.consumer;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer implements ApplicationRunner, Runnable {

    static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    // 消息队列
    final String queue = "queue_orders";

    // pending 队列,即待确认消息的队列
    final String pendingQueue = "pending_queue_orders";

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 应用启动后,创建新的线程来执行消费任务
        Thread thread = new Thread(this);
        thread.setName("order-consumer-thread");
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的
                // 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null
                String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);
                
                if (item == null) {
                    log.info("等待消息 ...");
                    continue ;
                }
                
                try {
                    
                    // 2:解析为 Long
                    Long orderId = Long.parseLong(item);
                    
                    // 模拟消息消费
                    log.info("消费消息: {}", orderId);
                    
                } catch (Exception e) {
                    log.error("消费异常:{}", e.getMessage());
                    continue;
                }
                
                // 3:消费成功,从 pending 队列删除记录,相当于确认消费
                stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);
            } catch (Exception e) {
                log.error("队列监听异常:{}", e.getMessage());
                break;
            }
        }
        log.info("退出消费");
    }
}

OrderConsumer 实现了 ApplicationRunner 接口,在应用就绪后创建新的消费线程进行消费。

stringRedisTemplate.opsForList().rightPopAndLeftPush 方法从 queue 队列消费一条消息,同时把消息添加到 pendingQueue 队列。该方法底层调用的正是 brpoplpush 命令,最多阻塞 5 秒,超时后返回 null

得到消息后解析为 Long 类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove 方法(底层正是 LREM 命令)从 pendingQueue 队列中删除消息。如果消费失败,失败的消息会在 pendingQueue 队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。

测试

启动应用后,通过 Redis 客户端往 queue_orders 队列推送消息:

> lpush queue_orders 10000
"1"
> lpush queue_orders 10010
"1"
> lpush queue_orders 10011
"1"
> lpush queue_orders Nan
"1"

queue_orders 队列推送了四条订单的 ID。注意最后一条消息值是 Nan,这会导致 Long.parseLong 异常从而导致消费失败。

服务端输出日志如下:

[           main] cn.springdoc.demo.DemoApplication        : Started DemoApplication in 3.769 seconds (process running for 4.18)
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10000
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10010
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费消息: 10011
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消费异常:For input string: "Nan"
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...

符合预期,前面三条消息都成功消费,最后一条消息消费失败。按照设计,这条消费失败的消息应该在 Pending 队列 pending_queue_orders 中存在。且应该只有这一条消息,因为其他三条消息都消费成功。

查看 pending_queue_orders 队列中的所有元素:

> lrange pending_queue_orders 0 -1
1) "Nan"

一切 OK,该队列中只有 Nan 这条消息,正是消费失败的那条消息。

此时,你如果想查看一下 Redis 中的所有 key,你会发现只有 pending_queue_orders 队列存在:

> keys *
1) "pending_queue_orders"

queue_orders 队列呢?这是 Redis List 的一个特性,当从 List 中弹出最后一个元素后,Redis 就会删除这个 Listqueue_orders 中的元素都被弹出了,所以它被删除了。当再次尝试往 queue_orders 中压入消息时,它会自动创建。也就是说 我们不需要手动预先创建队列, Redis 会自己创建,也会在合适的时间删除,而这一切都是线程安全的

由于这是线程安全的,所以队列中的 一条消息只能被一个消费者(客户端)进行消费,这非常适合在分布式或者是集群模式下使用,不必担心同一条消息被多个消费者消费到。

注意,Pending 队列中的消息可能存在重复消费的可能。例如,消费者成功消费消息后,在调用 remove 方法从 Pending 队列中删除消息时失败,那么 Pending 队列中的这条删除失败的消息其实已经是被成功消费了的,需要在业务中考虑到!

使用 BLMOVE 和 LMOVE 命令

上文说过,从 Redis 6.2.0 开始 BRPOPLPUSHRPOPLPUSH 命令就被声明为废弃了,后续版本中推荐使用 BLMOVELMOVE 命令。

目前 StringRedisTemplateSpring Boot 3.2.2)并未直接提供与 BLMOVELMOVE 命令对应的 API 方法,但是可以获取到底层连接对象来调用 BLMOVELMOVE 命令。

String item = this.stringRedisTemplate.execute(new RedisCallback<String>() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        // 调用 bLMove 命令
        byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5);
        return ret == null ? null : new String(ret);
    }
});

Redis 的持久化方式

Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 RDBAOF

  • RDB:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。
  • AOF:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。

这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 RDBAOF 两种方式,兼顾性能和数据安全。

总结

本文介绍了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH/BLMOVE 命令来实现一个线程安全且可靠的消息队列。