RocketMQ源码解读——消息发送之TOPIC路由信息获取

RocketMQ源码解读——消息发送之TOPIC路由信息获取

具体的发送demo我就不贴了,直接到里面去看实现了。

看到DefaultMQProducerImplsendDefaultImpl方法,这个方法非常的长,中间涉及到多个过程,比如拉取topic路由信息等,我们一部分一部分来看,先看开头部分:

1
2
3
4
5
6
7
8
9
this.makeSureStateOK(); //mq状态
Validators.checkMessage(msg, this.defaultMQProducer); //校验msg是否合法
final long invokeID = random.nextLong(); //调用编号,打印日志用
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;

//获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

一开始的部分比较简单,首先做一些基本校验,然后初始化了几个时间节点,rocketmq发送消息可以有重试的,如果发送总时间大于timeout,则会抛出超时异常。

然后是获取路由信息,我们下面具体看下获取路由信息的部分。

获取topic路由信息

先看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

这个topicPublishInfoTable是一个map,键是topic,值就是topic的路由信息,TopicPublishInfo中主要包含了包括:消息队列(MessageQueue)集合,一个index和最重要的topic路由信息(TopicRouteData

那我们可以看到,获取topic的路由信息,先从本地(或者可以认为是缓存)读,如果没有,或者有但是topic发布信息中的消息队列结合为空,则从NameServer获取。

我们看下updateTopicRouteInfoFromNameServer方法,还是比较长,内部先去尝试获取锁,获取成功则真正去从NameServer获取路由信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
TopicRouteData topicRouteData;
//如果是默认topic则获取默认topic的信息 否则获取topic的信息
if (isDefault && defaultMQProducer != null) {

//从nameserver获取default topic 的消息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//获取topic的消息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}

看一下真正获取的方法getTopicRouteInfoFromNameServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

//初始化一个获取路由信息的请求
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

//真正的执行,这里是同步执行,remotingClient底层目前只有一种实现方式就是netty
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());

注意个小细节:this.remotingClient.invokeSync(null, request, timeoutMillis)注意这句第一个入参是null,应该是地址,这里底层实现如果addr为null,则会调用getAndCreateNameserverChannel

看回到获取到topicRouteData以后(继续看updateTopicRouteInfoFromNameServer方法):

如果获取到的数据为null,记录没有获取到信息的日志和topic,否则执行topicRouteDataIsChange(old, topicRouteData);观察是否需要更新topic路由信息,如果需要更新,则更新所有Producer的路由信息。然后会执行一个代码块,更新消费者的订阅信息(如果这个Client实例上有消费者的话)。

至此,topic的路由信息获取结束。