RocketMQ 单队列能否并行消费?消费空洞与滑动窗口
在讲 RocketMQ 消息模型时,有一个经典约束:每个队列只能串行消费。
原因很直接——要保证严格顺序和消息不丢,Consumer 必须一条一条处理,确认完再拉下一条。一旦并发,就会出现”消费空洞”。
那如果我们不要求严格顺序呢?能不能在单个队列内做并行消费?
答案是可以的。而且 RocketMQ 本身已经实现了这个能力。
先搞清楚:消费空洞是什么
假设队列里有三条消息,offset 分别是 1、2、3,我们并发消费:
1 | offset=1 → 处理中 |
此时 offset=2 和 3 都完成了,如果直接把 commitOffset 推进到 3,那 offset=1 一旦失败需要重试,就找不回来了——它”掉”进了一个洞里。
这就是消费空洞:commit 点超过了还未完成的消息,导致重启或重平衡后消息永久丢失。
放宽约束后的核心问题
串行消费能避免空洞,因为 commitOffset 只有在当前消息确认后才往前走,天然保证”已提交 = 已处理”。
并行消费的挑战在于:多条消息同时在飞,commit 点该停在哪?
答案是:停在”最低未完成 offset”的前一位。
解决方案:滑动窗口
核心思路是维护一个”飞行中”的消息窗口,用类似 TCP 滑动窗口的方式管理 commit 点。
1 | 队列: [1][2][3][4][5][6][7]... |
具体实现:
- 用一个有序结构(
TreeMap<Long offset, Status>)记录所有飞行中的消息及其状态 - 每条消息处理完成(无论成功或失败),更新对应 offset 的状态
- 每次有消息完成时,从最小 offset 开始扫描,找到连续已完成的最大 offset
- 将 commitOffset 推进到该位置
1 | // 伪代码示意 |
这样,即使 offset=6、7 先完成,commitOffset 也不会越过 5——直到 5 完成,窗口才整体滑动。
RocketMQ 的实现:ProcessQueue
RocketMQ 的并发消费模式(MessageListenerConcurrently)就是这套思路的工业实现。
它用 ProcessQueue 作为核心数据结构:
- 内部是一个
TreeMap<Long offset, MessageExt>,存放所有已拉取但未提交的消息 - 消息消费完成后,从 TreeMap 中移除对应 offset
- 提交 offset 时,取
TreeMap.firstKey()(最小 offset)作为 commitOffset
1 | // RocketMQ 源码简化版(ProcessQueue.java) |
换言之:TreeMap 最小 key 就是”窗口的左边界”,只有它之前的消息才是安全可提交的。
失败消息怎么处理
并行消费模式下,失败不能像串行那样”原地重试再继续”,有几种策略:
| 策略 | 做法 | 代价 |
|---|---|---|
| 原地重试 | 失败后重试,完成前窗口不滑动 | 一条慢消息阻塞整个窗口 |
| 发回重试队列 | 把失败消息发到 %RETRY% topic,标记当前 offset 完成 |
不阻塞,但消息乱序(重试时机延后) |
| 丢弃进死信队列 | 超过重试次数后写入 DLQ | 最终放弃,保证主流程不被阻塞 |
RocketMQ 的并发消费默认走第二种:失败消息发回 %RETRY%TopicName,broker 延迟后重新投递,不影响当前队列的 offset 推进。
数据结构选型:TreeMap、ConcurrentSkipListMap、Ack Table
滑动窗口是一个思路框架,具体用什么数据结构来实现它,影响并发性能和实现复杂度。这里比较三种常见选择。
TreeMap(红黑树)
TreeMap 的底层是红黑树——一棵自平衡二叉搜索树,所有节点按 key 排序,左小右大。
1 | 5 |
红黑树通过旋转和变色来维持近似平衡(任意路径上黑节点数相同),保证树高 O(log n),从而让插入、删除、查找都是 O(log n)。
用在滑动窗口里:
- key = offset,value = 是否已完成
firstKey()→ 窗口左边界(最小 offset),O(log n)- 消息完成后
put(offset, true),找 commit 点时从头遍历
问题:TreeMap 不是线程安全的,多线程并发 mark 时必须加锁(RocketMQ 用的是 ReadWriteLock),高并发下锁竞争是瓶颈。
ConcurrentSkipListMap(跳表)
跳表(Skip List)是 TreeMap 的并发替代方案,结构上是多层有序链表:
1 | 第3层:HEAD ──────────────────────────────> [7] ──> NULL |
查找时从最高层开始,大步前进,碰到比目标大的节点就下沉一层,逐步逼近目标。平均 O(log n),和红黑树相同。
跳表的优势在于并发:插入和删除只需要修改局部节点的 next 指针,不涉及全局旋转,天然适合用 CAS(Compare-And-Swap)实现无锁操作。java.util.concurrent.ConcurrentSkipListMap 就是这么做的——多线程并发 mark offset 时无需全局锁,只有在节点级别做 CAS。
用在滑动窗口里:
- 直接替换
TreeMap,去掉synchronized块 firstKey()返回最小未完成 offset,线程安全- 高并发场景下比
TreeMap + lock吞吐量更高
代价:内存占用比 TreeMap 高(多层链表指针),且每条消息都需要独立节点,窗口大时内存不友好。
Ack Table(位图)
位图(Bitmap)是另一个维度的思路:不用树形结构按序维护节点,而是用一个定长数组的每一个比特位代表一个 offset 的完成状态。
映射关系:第 i 个 bit 对应消息 baseOffset + i,bit=1 表示已完成,bit=0 表示处理中。
1 | baseOffset = 100 |
从 bit 0 往右,找第一个 0——这里是 bit 2(offset=102),所以 commitOffset 停在 102,只有 100 和 101 可以安全提交。即使 103、104 已完成,也不能越过这个空洞。
- mark 操作:
bitmap |= (1L << i),O(1),无内存分配 - 找 commit 点:找从最低位起连续 1 的个数
找 commit 点用位运算加速,核心是 numberOfTrailingZeros(~word):
1 | word = 0b...00101011 (bit 0,1,3,4 为 1,bit 2 为 0) |
~word 把 0 变成 1,再找最低位的 1,合起来就是”原 word 从低位起连续 1 的个数”——即可提交的消息数。
1 | int committed = 0; |
位图的优势:
- **mark 是真正的 O(1)**,无内存分配,无指针跳转
- 极度缓存友好:一个 1024 消息的窗口只占 128 字节,一个 cache line 就装下了
- 无锁:mark 时用
AtomicLong的 CAS 即可,不需要全局锁
位图的限制:
- 窗口大小固定,超出窗口的消息无法追踪
- 窗口不能滑动直到左边界的消息完成(和滑动窗口一样的语义约束)
- 适合窗口大小可预估、消息 offset 连续的场景
三者对比
| TreeMap | ConcurrentSkipListMap | Ack Table (Bitmap) | |
|---|---|---|---|
| 底层结构 | 红黑树 | 跳表 | 位数组 |
| mark 复杂度 | O(log n) | O(log n) | O(1) |
| 找左边界 | O(log n) firstKey | O(1) firstKey | O(n/64) 位扫描 |
| 线程安全 | 需加锁 | 内置无锁 CAS | AtomicLong CAS |
| 内存 | 每条消息一个节点 | 每条消息多个指针 | 每条消息 1 bit |
| 适用场景 | 简单实现、消息量不大 | 高并发、消息量大 | 超高吞吐、窗口固定 |
RocketMQ 选择了 TreeMap + ReadWriteLock,因为消费侧的 mark 频率通常不是瓶颈(拉批处理的粒度较粗),简单可靠优先。如果是自研的高吞吐系统,Bitmap 方案在内存和 CPU 效率上都更占优。
与严格顺序消费的对比
| 串行消费 | 并行消费(滑动窗口) | |
|---|---|---|
| 吞吐量 | 低(单线程) | 高(线程池并发) |
| 消息顺序 | 严格保证 | 不保证(并发执行) |
| 消息不丢 | 保证 | 保证(commitOffset 安全) |
| 实现复杂度 | 简单 | 需要 offset 跟踪 |
| 适用场景 | 订单状态流转、账务 | 日志处理、通知推送 |
小结
单队列并行消费是可行的,关键是不能直接提交最新 offset,而要提交”最低未完成 offset 的前一位”。
滑动窗口(ProcessQueue)解决了这个问题:把飞行中的 offset 全部追踪起来,只有左边界连续完成时,commit 点才向前推进。
放弃严格顺序,换来的是线程池级别的并发能力——在大多数业务场景下,这笔交易是合算的。