RocketMQ源码解读——消息发送之选择队列并发送
接着上一篇消息发送之Topic路由信息获取来,这一篇我们看一下选择和发送的过程:
1 | MessageQueue mq = null; |
我们可以看到,如果是同步发送,默认重试2+1次,如果是异步则不重试。
选择队列
接着看下面的代码
1 | for (; times < timesTotal; times++) { |
进入循环体,最多重试timesTotal
次。先选择队列,看一下selectOneMessageQueue
方法,如果开启了容错策略,则根据容错策略选择消息队列,否则调用TopicPublishInfo.selectOneMessageQueue
,我们先看没有开启容错策略的部分,第一次调用该方法,lastBrokerName
就是null:
1 | int index = this.sendWhichQueue.getAndIncrement(); |
其实就是很简单的轮训。
如果lastBrokerName
不为null的时候:1
2
3
4
5
6
7
8
9
10
11int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
还是轮询所有的消息队列,获取一个和lastBrokerName
不一样的队列。以上就是不开启容错策略的选择队列的过程。下面我们再看一下开启了容错策略的过程:
1 | // 获取 brokerName=lastBrokerName && 可用的一个消息队列 |
先说一下总的逻辑,根据容错策略选择消息队列:优先获取可用队列,其次选择一个broker
获取队列,最差返回任意broker
的一个队列。
latencyFaultTolerance.isAvailable(mq.getBrokerName())
这行代码主要就是在判断broker
的可用性,判断依据是broker
的响应时间,如果这个broker
太久没有响应,则认为该broker
不可用。
latencyFaultTolerance.pickOneAtLeast()
把FaultItem
重新排序(这个FaultItem
就是封装了这个broker
的开始时间,等待时间等数据),然后还是用近似遍历的方式找一个broker
。这里面有个小细节,这个遍历的依据并不是所有的FaultItem
的集合,而是这个集合长度的一半,可能考虑到这个集合会经常变化吧?
后面的逻辑和选择一个队列不考虑可用性的逻辑基本一致。至此选择队列的逻辑就全部结束了,下一部分我们看一下真正的发送。
发送消息
真正的发送是这一句:sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
这个mq就是我们上面选择了的mq。
看一下这个sendKernelImpl
的方法,还是很长我们还是需要分段来看:1
2
3
4
5
6
7//获取broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//如果这时候获取不到,则再进行一次获取topic的路由信息流程
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
上面两步走完,brokerAddr
就默认存在了,如果继续不存在则抛出异常。我们看下不为null时的正常流程(代码部分有点冗长,有兴趣的可以自己研究一下):
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
是否使用broker vip
通道。broker
会开启两个端口对外服务。this.tryToCompressMessage(msg)
尝试压缩消息。- 根据是否存在
CheckForbiddenHook
拼装CheckForbiddenContext
。 - 根据是否存在发送消息钩子构建发送消息上下文,并且执行发送前要执行的钩子方法。
- 拼装发送消息的
request
,这里如果是批量消息,则batch
标记会置为true
。 - 根据发送模式发送消息,同步和只发送一次的消息走的是相同的方法。
我们主要看发送消息,先看异步,有回调的:this.mQClientFactory.getMQClientAPIImpl().sendMessage
方法最终会走到sendMessageAsync
中:1
2
3
4
5
6this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
//... do send message complete
}
}
我们继续看remotingClient.invokeAsync
方法:
1 | //创建channel,这里是netty的channel |
封装的真多,继续看invokeAsyncImpl
方法,在这个方法中我们看到了channel.writeAndFlush(request).addListener(new ChannelFutureListener() { ... }
,熟悉netty的同学应该知道,这边会把msg
直接冲刷到socket
中。我们注意几个细节:
- 发送之前需要获取一个信号量,如果获取不到则抛出一个超时异常。
- 如果发送的
ChannelFuture
成功,则不执行invokeCallback
,我们可以看到判断如果成功就返回了,注意这里这个InvokeCallback
并不是我们自己可以添加的SendCallback
,SendCallback
可以处理消息发送成功或者失败,而InvokeCallback
处理的是ResponseFuture
。我们通过DefaultMQProducer
不能设置这个InvokeCallback
。 - 真正执行重试的是在
processSendResponse
之后,如果SendResult
处理后需要重试,会执行onExceptionImpl
,其中有这么一句:if (needRetry && tmp <= timesTotal)
,如果需要则重试。
invokeOnewayImpl
和invokeAsyncImpl
有类似的地方,但是不需要处理回调。而invokeSyncImpl
和异步处理的区别在于,同步发送会在发送之后等待结果的返回:RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
,并且返回responseCommand
,之后执行processSendResponse
,返回发送结果。至此,整个发送流程结束。
减少队列数量的细节
有个小细节,之前在rocketmq的issue里看到的。关于QueueData中的writeQueueNums
和readQueueNums
,这两个不是同一个值,官方解释是比较容易扩展,增加队列数量还好,rocketmq会隔段时间自动同步队列信息。问题就是减少队列数量,会不会导致丢消息,这里如果本来有十个队列(1~10号),现在变为五个(只剩1~5号),那么可能导致丢消息。针对减少队列的情况,真正推荐的做法如下:
- 减少写入队列的数量。
- 等待所有待消耗队列中的消息被所有消耗组消耗。这一步就是为了保证不丢消息。
- 减少读取队列号码。