RocketMQ源码解读——Broker消息存储

RocketMQ源码解读——Broker消息存储


我们这一篇来看broker的消息存储过程。

broker启动时会启动一个NettyRemotingServer,我们之前说过,rocketmq通过netty做信息传递,这个NettyRemotingServer初始化时,会把NettyServerHandler加到Pipeline里,这个方法最终会执行processMessageReceived方法,根据消息的种类来分类处理消息。

我们关注的是请求,关注一下processRequestCommand这个方法:

  1. 先获取一个pair(rocketmq封装的类似键值对的对象),第一个值是processor,第二个是一个线程池,用来处理请求。
  2. 初始化任务。
  3. 提交任务。

初始化任务这一步包含比较多的逻辑,我们看一下要初始化的这个任务是做什么的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {

//如果有rpc钩子,先执行 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}

//这里用pair的第一个processor来处理请求
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {

//后执行 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}

if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
//响应信息回写
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
//如果response为null就忽略了
}
}

总结一下这个任务,就是处理请求,并且把请求写会远端,如果是一个一次性请求,则不回写response

我们看一下这个processor做什么,这里这个processorSendMessageProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}

mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

RemotingCommand response;
//真正的处理请求的方法
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}

this.executeSendMessageHookAfter(response, mqtraceContext);
return response;

这里又又又出现了钩子,可以看到rocketmq里各种钩子遍地飞,这里注意这个钩子和上个方法中的不一样,这个钩子是发送消息的钩子,上一个是RPC钩子,及远程调用,不太一样,发送消息一定是RPC调用,但是反过来不成立,注意区分。

我们看一下sendMessage的部分:

  1. 拼装响应头。(之前跟同事聊起rocketmq,同事说rocketmq不完善的其中之一就在于没有消息追踪,但是我们看的是4.2.0版本已经有开关可以控制开启TRACE,作者没有具体验证这个功能,读完debug完再说)
  2. 校验消息体合法。
  3. 选取queueId,决定要写到哪个队列里去。
  4. 处理重试和死信队列。
  5. 创建MessageExtBrokerInner
  6. 获取MessageStore并且存储消息。
  7. 处理存储信息的结果。

我们先看一下第四步,如果是RETRY类型的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}

//计算最大可消费次数
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//获取已消费次数
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
//死信队列每个消费组只有一条队列
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
//把topic设置为新的死信队列的topic(%RETRY% + consumerGroup)
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}

我们可以看到,如果一条消息一直消费失败,会添加到死信队列里。

关于真正存储消息的部分,下一篇再说,因为涉及到rocketmq的文件格式的问题,这是非常重要的。

这里有个注意的地方是,什么样的消息是RETRY类型的消息,这个部分的代码要到我们的消费者里去看了,回忆一下上一篇:我们启动消费者的时候,实际上会启动一个定时服务不停的向一个队列中插入PullRequest,然后有一个死循环一直在获取请求然后去拉取消息,我们看一下ConsumeMessageConcurrentlyService.submitConsumeRequest方法,里面其实提交的任务是一个叫ConsumeRequest的类,这个类实际上真正处理消息的类,我们在这里面找到了真正用监听器去处理消息的语句:listener.consumeMessage(Collections.unmodifiableList(msgs), context);。大家还有印象么?我们追踪一下这个方法返回的status,可以看到有这么一句:ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);及处理消费结果的部分,追进去看一下,里面发现如果消费者是集群模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//如果消息消费成功,则msgBackFailed为空
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//消费失败会进入循环
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//把消息发回去
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

//发回Broker失败的消息,直接提交延迟重新消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;

集群模式下,如果消费失败会把消息发回去让其他机器消费。我们最终追到sendMessageBack方法中,如果发送回去时报错,我们看下如何处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);

我们看到,重新创建了一条新的消息,但是TOPIC修改了一下,修改成%RETRY% + consumerGroup。然后又走了一遍发送操作。

说回去,最后看一下handlePutMessageResult这个方法,处理最终存储的结果和相应信息:

  1. 根据冷存的结果设置相应的code
  2. 如果冷存成功,统计相应的信息。然后把response写回远端,如果有钩子则设置发送失败到context
  3. 如果执行失败,如果有钩子则设置发送失败到context

至此,消息存储部分结束。