RocketMQ源码解读——消息发送之选择队列并发送

RocketMQ源码解读——消息发送之选择队列并发送

接着上一篇消息发送之Topic路由信息获取来,这一篇我们看一下选择和发送的过程:

1
2
3
4
5
6
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//获取重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;

我们可以看到,如果是同步发送,默认重试2+1次,如果是异步则不重试。

选择队列

接着看下面的代码

1
2
3
4
5
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// do other thing ...
}

进入循环体,最多重试timesTotal次。先选择队列,看一下selectOneMessageQueue方法,如果开启了容错策略,则根据容错策略选择消息队列,否则调用TopicPublishInfo.selectOneMessageQueue,我们先看没有开启容错策略的部分,第一次调用该方法,lastBrokerName就是null:

1
2
3
4
5
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);

其实就是很简单的轮训。

如果lastBrokerName不为null的时候:

1
2
3
4
5
6
7
8
9
10
11
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();

还是轮询所有的消息队列,获取一个和lastBrokerName不一样的队列。以上就是不开启容错策略的选择队列的过程。下面我们再看一下开启了容错策略的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 获取 brokerName=lastBrokerName && 可用的一个消息队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}

//下面的部分是,如果上面部分没有选到一个符合条件的队列的情况下,会选择一个相对较好的队列,不考虑该队列的可用性
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}

先说一下总的逻辑,根据容错策略选择消息队列:优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

latencyFaultTolerance.isAvailable(mq.getBrokerName())这行代码主要就是在判断broker的可用性,判断依据是broker的响应时间,如果这个broker太久没有响应,则认为该broker不可用。

latencyFaultTolerance.pickOneAtLeast()FaultItem重新排序(这个FaultItem就是封装了这个broker的开始时间,等待时间等数据),然后还是用近似遍历的方式找一个broker。这里面有个小细节,这个遍历的依据并不是所有的FaultItem的集合,而是这个集合长度的一半,可能考虑到这个集合会经常变化吧?

后面的逻辑和选择一个队列不考虑可用性的逻辑基本一致。至此选择队列的逻辑就全部结束了,下一部分我们看一下真正的发送。

发送消息

真正的发送是这一句:sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);这个mq就是我们上面选择了的mq。

看一下这个sendKernelImpl的方法,还是很长我们还是需要分段来看:

1
2
3
4
5
6
7
//获取broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//如果这时候获取不到,则再进行一次获取topic的路由信息流程
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

上面两步走完,brokerAddr就默认存在了,如果继续不存在则抛出异常。我们看下不为null时的正常流程(代码部分有点冗长,有兴趣的可以自己研究一下):

  1. brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);是否使用broker vip通道。broker会开启两个端口对外服务。
  2. this.tryToCompressMessage(msg)尝试压缩消息。
  3. 根据是否存在CheckForbiddenHook拼装CheckForbiddenContext
  4. 根据是否存在发送消息钩子构建发送消息上下文,并且执行发送前要执行的钩子方法。
  5. 拼装发送消息的request,这里如果是批量消息,则batch标记会置为true
  6. 根据发送模式发送消息,同步和只发送一次的消息走的是相同的方法。

我们主要看发送消息,先看异步,有回调的:this.mQClientFactory.getMQClientAPIImpl().sendMessage方法最终会走到sendMessageAsync中:

1
2
3
4
5
6
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
//... do send message complete
}
}

我们继续看remotingClient.invokeAsync方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//创建channel,这里是netty的channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
//rpchook的用法:在真正进行远程请求时执行
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
//真正异步发送
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
//创建不成功抛异常,远程连接异常
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}

封装的真多,继续看invokeAsyncImpl方法,在这个方法中我们看到了channel.writeAndFlush(request).addListener(new ChannelFutureListener() { ... },熟悉netty的同学应该知道,这边会把msg直接冲刷到socket中。我们注意几个细节:

  1. 发送之前需要获取一个信号量,如果获取不到则抛出一个超时异常。
  2. 如果发送的ChannelFuture成功,则不执行invokeCallback,我们可以看到判断如果成功就返回了,注意这里这个InvokeCallback并不是我们自己可以添加的SendCallbackSendCallback可以处理消息发送成功或者失败,而InvokeCallback处理的是ResponseFuture。我们通过DefaultMQProducer不能设置这个InvokeCallback
  3. 真正执行重试的是在processSendResponse之后,如果SendResult处理后需要重试,会执行onExceptionImpl,其中有这么一句:if (needRetry && tmp <= timesTotal),如果需要则重试。

invokeOnewayImplinvokeAsyncImpl有类似的地方,但是不需要处理回调。而invokeSyncImpl和异步处理的区别在于,同步发送会在发送之后等待结果的返回:RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);,并且返回responseCommand,之后执行processSendResponse,返回发送结果。至此,整个发送流程结束。

减少队列数量的细节

有个小细节,之前在rocketmq的issue里看到的。关于QueueData中的writeQueueNumsreadQueueNums,这两个不是同一个值,官方解释是比较容易扩展,增加队列数量还好,rocketmq会隔段时间自动同步队列信息。问题就是减少队列数量,会不会导致丢消息,这里如果本来有十个队列(1~10号),现在变为五个(只剩1~5号),那么可能导致丢消息。针对减少队列的情况,真正推荐的做法如下:

  1. 减少写入队列的数量。
  2. 等待所有待消耗队列中的消息被所有消耗组消耗。这一步就是为了保证不丢消息。
  3. 减少读取队列号码。