RocketMQ源码解读——消费者的Rebalance过程

RocketMQ源码解读——消费者的Rebalance过程

@(rocketmq源码解读)


关于消费者的Rebalance过程,入口在RebalanceService,这是个线程,默认每隔20s做一次rebalance

我们追进来,看一下RebalanceImpldoRebalance方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//isOrder 是否顺序消息
public void doRebalance(final boolean isOrder) {
// 每个topic的订阅数据
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}

这里可以看到,对所有Consumer端维护的Topic都进行rebalance

继续看一下rebalanceByTopic,这个方法比较长,根据消息模式分为广播集群模式,先看广播模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;

其中topicSubscribeInfoTable的更新操作(更新topic对应的MessageQueue)信息,我们之前提到过,发生在发送消息时(updateTopicRouteInfoFromNameServer方法),我们看一下updateProcessQueueTableInRebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {//不包含的队列
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {//拉去的队列超时,同样清理
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
//PUSH模式下,移除拉取超时的
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}

继续往下看,下面是把远端新增的队列加入到processQueueTable中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
for (MessageQueue mq : mqSet) {
//如果processQueueTable不包括这个mq
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//把这个mq的offset先干掉,再添加
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
//返回是否有变化
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

下面会发起消息拉取请求,我们忽略。

集群模式的更新队列方式使用的同样是updateProcessQueueTableInRebalance。这里注意下不同模式下的消费进度:
广播模式:使用本地文件的消费进度。
集群模式:使用Broker的消费进度。

广播模式使用本地的消费进度即可,因为消费者之间互相独立,集群模式则不是,正常情况下,一条消息在一个消费者上消费成功,则不会发送到其他消费者,消费的进度需要由Broker把控。

集群模式的消息rebalance我们后面再看。