RocketMQ源码解读——Consumer从文件中拉取消息

RocketMQ源码解读——Consumer从文件中拉取消息

@(rocketmq源码解读)


消费者拉取消息,使用的是PullMessageProcessor,看一下processRequest方法,一开始初始化response的相关数据,然后是校验,我们具体看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//校验Broker是否可读
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}

// 校验 consumer分组是否存在
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}

// 校验 consumer分组配置 是否可消费
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}

第一和第三步校验的内容比较简单,我们主要看第二步,即校验consumer配置的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
if (null == preConfig) {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
this.dataVersion.nextVersion();
this.persist();
}
}
return subscriptionGroupConfig;
}

这里有个小细节,如果broker可以自动创建订阅组的时候,会直接把订阅配置创建好,并且执行持久化步骤。

接着说,往下还是校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 校验 topic配置 存在
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}

// 校验 topic配置 权限可读
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}

// 校验 读取队列 在 topic配置 队列范围内
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}

上面的三个校验都很好理解,然后是在过滤订阅表达式的时候(final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())),会校验订阅关系,这个部分我们就不细看了,会校验几个信息:

  1. 消费分组信息 是否存在。
  2. 消费分组信息 消息模型是否匹配。
  3. 订阅信息是否存在。
  4. 版本。

然后就是获取消息的部分了:final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

我们看一下DefaultMessageStoregetMessage。一开始还是校验各种各样的信息,我们略过看重点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

//看下这个findConsumeQueue方法:
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;

这里获取的逻辑比较简单,没有就创建创建创建。。。

然后就是根据这个Consume Queue获取消息了,我们之前说过,Consume Queue叫做逻辑队列,相当于字典的索引表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 消费队列 最小编号
minOffset = consumeQueue.getMinOffsetInQueue();
// 消费队列 最大编号
maxOffset = consumeQueue.getMaxOffsetInQueue();

if (maxOffset == 0) { // 消费队列无消息
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) { // 查询offset 太小
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) { // 查询offset 超过 消费队列一个位置
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) { // 查询offset 超过 消费队列大于一个位置
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
}

这里主要校验,请求的队列的offset是否越界以及合法的问题,不合法的offset会通过nextOffsetCorrection方法修正,告诉消费者正确的、下次开始消费的位置。

通过校验,会去获取映射的buffer的结果:SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset)如果找不到返回一个OFFSET_FOUND_NULL的状态,我们主要看获取到的情况,处理很长,分段来看:

1
2
3
4
5
6
7
8
9
// commitLog下一个文件(MappedFile)对应的开始offset
long nextPhyFileStartOffset = Long.MIN_VALUE;
// 消息物理位置拉取到的最大offset
long maxPhyOffsetPulling = 0;

int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

获取一些信息,初始化过滤消息的条数等。然后开始循环获取消息位置信息:

1
2
3
4
5
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
}

这三个数值之前我们在介绍Consume Queue中的消息格式的时候已经介绍过了,分别是消息的物理offsetsizetag的hashcode

继续往下看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
maxPhyOffsetPulling = offsetPy;

if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}

//校验commitLog是否需要硬盘,无法全部放在内存,这里应该是获取消息的物理地址超过内存总大小了
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}

boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}

//过滤consume queue,主要是判断tag是否符合
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}

SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}

nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}

//判断消息是否符合条件
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}

//添加消息到结果中,更新offset和状态
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;

其中:SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);这句就是真正的获取消息的部分,里面内容没有什么需要特别注意的部分,我们就忽略不计了。

至此整个rocketmq真正获取消息的部分就结束了。