RocketMQ 单队列能否并行消费?消费空洞与滑动窗口

在讲 RocketMQ 消息模型时,有一个经典约束:每个队列只能串行消费

原因很直接——要保证严格顺序和消息不丢,Consumer 必须一条一条处理,确认完再拉下一条。一旦并发,就会出现”消费空洞”。

那如果我们不要求严格顺序呢?能不能在单个队列内做并行消费?

答案是可以的。而且 RocketMQ 本身已经实现了这个能力。


先搞清楚:消费空洞是什么

假设队列里有三条消息,offset 分别是 1、2、3,我们并发消费:

1
2
3
offset=1 → 处理中
offset=2 → 处理完成 ✓
offset=3 → 处理完成 ✓

此时 offset=2 和 3 都完成了,如果直接把 commitOffset 推进到 3,那 offset=1 一旦失败需要重试,就找不回来了——它”掉”进了一个洞里。

这就是消费空洞:commit 点超过了还未完成的消息,导致重启或重平衡后消息永久丢失。


放宽约束后的核心问题

串行消费能避免空洞,因为 commitOffset 只有在当前消息确认后才往前走,天然保证”已提交 = 已处理”。

并行消费的挑战在于:多条消息同时在飞,commit 点该停在哪?

答案是:停在”最低未完成 offset”的前一位


解决方案:滑动窗口

核心思路是维护一个”飞行中”的消息窗口,用类似 TCP 滑动窗口的方式管理 commit 点。

1
2
3
4
5
6
7
8
9
队列: [1][2][3][4][5][6][7]...

窗口起点 (commitOffset)

当前飞行中: {3✓, 4✓, 5✗(处理中), 6✓, 7✓}

最低未完成

commitOffset 只能推进到 4(5 还没完成)

具体实现:

  1. 用一个有序结构(TreeMap<Long offset, Status>)记录所有飞行中的消息及其状态
  2. 每条消息处理完成(无论成功或失败),更新对应 offset 的状态
  3. 每次有消息完成时,从最小 offset 开始扫描,找到连续已完成的最大 offset
  4. 将 commitOffset 推进到该位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 伪代码示意
TreeMap<Long, Boolean> inflightWindow = new TreeMap<>();

// 拉取一批消息,全部扔进线程池
for (Message msg : pullBatch) {
inflightWindow.put(msg.getOffset(), false); // false = 未完成
threadPool.submit(() -> {
process(msg);
inflightWindow.put(msg.getOffset(), true); // 标记完成
tryAdvanceCommitOffset();
});
}

void tryAdvanceCommitOffset() {
long newCommit = currentCommitOffset;
for (Map.Entry<Long, Boolean> e : inflightWindow.entrySet()) {
if (!e.getValue()) break; // 遇到未完成的,停住
newCommit = e.getKey();
}
commitOffset(newCommit);
inflightWindow.headMap(newCommit, true).clear(); // 清掉已提交部分
}

这样,即使 offset=6、7 先完成,commitOffset 也不会越过 5——直到 5 完成,窗口才整体滑动。


RocketMQ 的实现:ProcessQueue

RocketMQ 的并发消费模式(MessageListenerConcurrently)就是这套思路的工业实现。

它用 ProcessQueue 作为核心数据结构:

  • 内部是一个 TreeMap<Long offset, MessageExt>,存放所有已拉取但未提交的消息
  • 消息消费完成后,从 TreeMap 中移除对应 offset
  • 提交 offset 时,取 TreeMap.firstKey()(最小 offset)作为 commitOffset
1
2
3
4
5
6
7
8
9
10
11
// RocketMQ 源码简化版(ProcessQueue.java)
public long removeMessage(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
msgTreeMap.remove(msg.getQueueOffset());
}
// 返回当前最小 offset,即 commitOffset 的位置
if (!msgTreeMap.isEmpty()) {
return msgTreeMap.firstKey();
}
return ...;
}

换言之:TreeMap 最小 key 就是”窗口的左边界”,只有它之前的消息才是安全可提交的


失败消息怎么处理

并行消费模式下,失败不能像串行那样”原地重试再继续”,有几种策略:

策略 做法 代价
原地重试 失败后重试,完成前窗口不滑动 一条慢消息阻塞整个窗口
发回重试队列 把失败消息发到 %RETRY% topic,标记当前 offset 完成 不阻塞,但消息乱序(重试时机延后)
丢弃进死信队列 超过重试次数后写入 DLQ 最终放弃,保证主流程不被阻塞

RocketMQ 的并发消费默认走第二种:失败消息发回 %RETRY%TopicName,broker 延迟后重新投递,不影响当前队列的 offset 推进。


数据结构选型:TreeMap、ConcurrentSkipListMap、Ack Table

滑动窗口是一个思路框架,具体用什么数据结构来实现它,影响并发性能和实现复杂度。这里比较三种常见选择。


TreeMap(红黑树)

TreeMap 的底层是红黑树——一棵自平衡二叉搜索树,所有节点按 key 排序,左小右大。

1
2
3
4
5
    5
/ \
3 7
/ \ / \
2 4 6 8

红黑树通过旋转和变色来维持近似平衡(任意路径上黑节点数相同),保证树高 O(log n),从而让插入、删除、查找都是 O(log n)。

用在滑动窗口里:

  • key = offset,value = 是否已完成
  • firstKey() → 窗口左边界(最小 offset),O(log n)
  • 消息完成后 put(offset, true),找 commit 点时从头遍历

问题:TreeMap 不是线程安全的,多线程并发 mark 时必须加锁(RocketMQ 用的是 ReadWriteLock),高并发下锁竞争是瓶颈。


ConcurrentSkipListMap(跳表)

跳表(Skip List)是 TreeMap 的并发替代方案,结构上是多层有序链表

1
2
3
4
第3层:HEAD ──────────────────────────────> [7] ──> NULL
第2层:HEAD ──────────> [3] ──────────────> [7] ──> NULL
第1层:HEAD ──> [1] ──> [3] ──> [5] ──────> [7] ──> NULL
第0层:HEAD ──> [1] ──> [2] ──> [3] ──> [5] ──> [6] ──> [7] ──> NULL

查找时从最高层开始,大步前进,碰到比目标大的节点就下沉一层,逐步逼近目标。平均 O(log n),和红黑树相同。

跳表的优势在于并发:插入和删除只需要修改局部节点的 next 指针,不涉及全局旋转,天然适合用 CAS(Compare-And-Swap)实现无锁操作。java.util.concurrent.ConcurrentSkipListMap 就是这么做的——多线程并发 mark offset 时无需全局锁,只有在节点级别做 CAS。

用在滑动窗口里:

  • 直接替换 TreeMap,去掉 synchronized
  • firstKey() 返回最小未完成 offset,线程安全
  • 高并发场景下比 TreeMap + lock 吞吐量更高

代价:内存占用比 TreeMap 高(多层链表指针),且每条消息都需要独立节点,窗口大时内存不友好。


Ack Table(位图)

位图(Bitmap)是另一个维度的思路:不用树形结构按序维护节点,而是用一个定长数组的每一个比特位代表一个 offset 的完成状态。

映射关系:第 i 个 bit 对应消息 baseOffset + i,bit=1 表示已完成,bit=0 表示处理中。

1
2
3
4
5
6
baseOffset = 100

消息 offset: 100 101 102 103 104 105
对应 bit 位: 0 1 2 3 4 5
当前状态: 1 1 0 1 1 0
已完 已完 处理中 已完 已完 处理中

从 bit 0 往右,找第一个 0——这里是 bit 2(offset=102),所以 commitOffset 停在 102,只有 100 和 101 可以安全提交。即使 103、104 已完成,也不能越过这个空洞。

  • mark 操作:bitmap |= (1L << i),O(1),无内存分配
  • 找 commit 点:找从最低位起连续 1 的个数

找 commit 点用位运算加速,核心是 numberOfTrailingZeros(~word)

1
2
3
word  = 0b...00101011   (bit 0,1,3,4 为 1,bit 2 为 0)
~word = 0b...11010100 (取反,原来的 0 变成 1)
numberOfTrailingZeros(~word) = 2

~word 把 0 变成 1,再找最低位的 1,合起来就是”原 word 从低位起连续 1 的个数”——即可提交的消息数。

1
2
3
4
5
6
7
8
9
10
int committed = 0;
for (long word : bitmap) {
if (word == -1L) { // 64 位全 1,这 64 个 offset 全部完成
committed += 64;
} else {
committed += Long.numberOfTrailingZeros(~word);
break;
}
}
commitOffset = baseOffset + committed;

位图的优势:

  • **mark 是真正的 O(1)**,无内存分配,无指针跳转
  • 极度缓存友好:一个 1024 消息的窗口只占 128 字节,一个 cache line 就装下了
  • 无锁:mark 时用 AtomicLong 的 CAS 即可,不需要全局锁

位图的限制:

  • 窗口大小固定,超出窗口的消息无法追踪
  • 窗口不能滑动直到左边界的消息完成(和滑动窗口一样的语义约束)
  • 适合窗口大小可预估、消息 offset 连续的场景

三者对比

TreeMap ConcurrentSkipListMap Ack Table (Bitmap)
底层结构 红黑树 跳表 位数组
mark 复杂度 O(log n) O(log n) O(1)
找左边界 O(log n) firstKey O(1) firstKey O(n/64) 位扫描
线程安全 需加锁 内置无锁 CAS AtomicLong CAS
内存 每条消息一个节点 每条消息多个指针 每条消息 1 bit
适用场景 简单实现、消息量不大 高并发、消息量大 超高吞吐、窗口固定

RocketMQ 选择了 TreeMap + ReadWriteLock,因为消费侧的 mark 频率通常不是瓶颈(拉批处理的粒度较粗),简单可靠优先。如果是自研的高吞吐系统,Bitmap 方案在内存和 CPU 效率上都更占优。


与严格顺序消费的对比

串行消费 并行消费(滑动窗口)
吞吐量 低(单线程) 高(线程池并发)
消息顺序 严格保证 不保证(并发执行)
消息不丢 保证 保证(commitOffset 安全)
实现复杂度 简单 需要 offset 跟踪
适用场景 订单状态流转、账务 日志处理、通知推送

小结

单队列并行消费是可行的,关键是不能直接提交最新 offset,而要提交”最低未完成 offset 的前一位”。

滑动窗口(ProcessQueue)解决了这个问题:把飞行中的 offset 全部追踪起来,只有左边界连续完成时,commit 点才向前推进。

放弃严格顺序,换来的是线程池级别的并发能力——在大多数业务场景下,这笔交易是合算的。