前言

Redis作为最流行的开源数据库之一,其在消息队列领域的应用经历了多个重要阶段。从早期的简单队列实现,到中间件方案的兴起,再到Redis自身功能的完善,每一步都体现了技术发展的轨迹。本文将从技术发展的角度,梳理Redis消息队列的演进历程,并探讨其在不同阶段的应用场景。


一、List结构:双向链表的队列哲学

Redis最初并不是为消息队列专门设计的,但其内置的list数据结构为队列的实现提供了基础。早期的开发者们利用lpush和rpop命令模拟队列行为,构建了最简单的消息队列系统。

1.1 底层数据结构解析

实现核心:Redis List基于双向链表(3.2版本前)或快速列表(QuickList,3.2+)

// Redis源码中的List结构定义
typedef struct list {
    listNode *head;  // 头节点
    listNode *tail;  // 尾节点
    void *(*dup)(void *ptr);  // 节点复制函数
    void (*free)(void *ptr);  // 节点释放函数
    unsigned long len;  // 链表长度
} list;

1.2 消息队列实现原理

生产者: LPUSH/RPUSH 插入链表头/尾。 消费者: RPOP/LPOP 从链表尾/头取出。 阻塞操作: BRPOP/BLPOP 基于epoll实现事件循环等待。

# 生产者(终端1)
> LPUSH order_queue "{"order_id":1001}"

# 消费者(终端2)
> BRPOP order_queue 0  # 0表示无限等待
1) "order_queue"
2) "{"order_id":1001}"

这种用简单链表实现的消息队列首先有消息丢失的问题,比如消费者崩溃时消息未ACK,针对这个问题我们可以通过备份队列来优化。 可靠队列实现(ACK+备份队列) 主队列(main_queue):新消息入口,使用LPUSH写入。 备份队列(backup_queue):正在处理的消息,通过RPOPLPUSH原子操作转移。

1.3 缺陷

List队列缺陷 因为这种方式实现的消息队列时点对点的,所以List实现的消息队列只能消费一次,缺乏广播机制。

二、Pub/Sub:发布订阅的广播模型

2.1 底层实现机制

核心结构:Redis使用pubsub_channels字典维护频道与订阅者的映射

// 源码中的频道结构
struct redisServer {
    dict *pubsub_channels;  // 频道字典
    list *pubsub_patterns;  // 模式匹配列表
};

// 订阅关系存储格式
"news.sports" -> [client1, client2, client3]  # 频道直连订阅
"news.*"       -> [client4, client5]          # 模式匹配订阅

消息传递流程 消息传递流程

2.2 缺陷

发布订阅模式解决了List实现的痛点:单个消息可被多个消费者接收,基于事件驱动的即时通知(实时),并且能根据通配符订阅。 但是这种模式也是具有缺陷的,发布订阅模式的消息仅存储在订阅者连接中,不具有持有化的能力,所以网络闪断即丢失,并且离线客户端无法获取历史消息。 网络抖动即丢失

2.3 适用场景

  1. 实时事件通知:聊天室消息广播,实时数据看板更新。
  2. 配置动态生效。
  3. 设备状态同步:
IoT设备群组:
PUBLISH device/group1/temperature "25.6℃"

实时性高,但是存在消息丢失的风险,在一些金融交易领域是不适用的。

三、Stream:现代消息队列的完全体

3.1 Stream 核心数据结构

消息存储结构:

typedef struct stream {
    rax *rax;               // 基数树存储消息
    uint64_t length;        // 消息总数
    streamID last_id;       // 最后一条消息ID
    rax *cgroups;           // 消费者组字典
} stream;

消息ID: <毫秒时间戳>-<序列号>(如 1651234567890-5)。 内容存储: 采用 Radix Tree 基数树,支持高效范围查询。

消费者组结构:

typedef struct streamCG {
    streamID last_id;       // 最后分发ID
    rax *consumers;         // 消费者列表
    rax *pel;              // 待确认消息列表(Pending Entries List)
} streamCG;

3.2 关键操作原理解析

消息生命周期: 消息生命周期

  1. 消息生产:
# *表示自动生成ID,也可自定义ID
XADD mystream * sensor_id 123 temp 36.5

底层操作: 生成全局递增ID,将消息存入Radix Tree,更新stream的last_id。

  1. 消费者组消费:
XGROUP CREATE mystream mygroup $ MKSTREAM
XREADGROUP GROUP mygroup consumer1 COUNT 1 BLOCK 2000 STREAMS mystream >

核心机制:> 表示只接收新消息,消息会同时加入PEL(Pending Entries List),每个消费者维护自己的未确认消息列表。

  1. 消息确认:
XACK mystream mygroup 1651234567890-0

从消费者组的PEL中移除,更新消费者游标。

3.3 典型应用场景

  1. 电商订单流程:
  1. 物联网数据处理:
# 设备上报
XADD sensor_data * device_id 123 temp 25.6

# 消费者组处理
XREADGROUP GROUP analytics consumer1 STREAMS sensor_data >

Redis Stream 通过精巧的数据结构设计,在保持Redis高性能特性的同时,提供了完整的消息队列能力。其核心优势在于采用了Radix Tree存储结构大大提高内存效率,单线程模型保证操作原子性还无需额外中间件。

适合需要消息持久化但又希望保持架构简洁的场景,是传统MQ与Redis List之间的完美平衡点。


总结

工具的进化史就是需求的映射史 从List到Stream的演进,本质上是对不同时代业务需求的回应。理解每个阶段的设计取舍,才能在选择时做出最合适的架构决策。Redis的消息队列发展史,正是分布式系统演进的一个精彩切片。

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]