RocketMQ源码解读——消息消费之消费者启动

RocketMQ源码解读——消息消费之消费者启动


我们下面两篇文章看消费者。消息队列主要就是生产和消费两端,但是其实rocketmq还有很重要的两个部分是brokernameserverbroker中有真正的消息,nameserver是一个类似注册中心的角色。broker中存储的消息结构复杂,我们后面再看,先看拉取。

消费者分为两种,本质上都是通过的方式实现的,我们先看消费者启动部分。

消费者启动设置要注意的有几个点:

  1. 设置nameserver(这个producerbroker也需要)。
  2. 设置从哪里消费。
  3. 订阅topictag

push类型的消费者,依靠server端主动的推送消息过来(表面上),需要注册监听器,来监听发送过来的消息。

pull类型的消费者,需要调用consumer.fetchSubscribeMessageQueues(topic)方法,从server端主动拉取消息。

我们挑选push类型的看一下。

cusumer.start

代码太多了,我们挑主要的看一下,启动中一共要启动两个组件:consumeMessageServicemQClientFactory

consumeMessageService开始时开启一个定时任务,主要是定时清理过期消息,而主要的功能是处理消息,里面会使用我们注册的监听器对消息进行消费,mQClientFactory功能比较重要,重点看一下它的启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;

这里要启动好多定时任务,具体的看startScheduledTask,这里先不细说,先看一下PullMessageServicerun方法:

1
2
3
4
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}

当这个service不结束就会一直不停的从队列里获取PullRequest,如果能获取到,就指定拉取消息的部分:

1
2
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);

这个pullMessage方法最终会执行this.pullAPIWrapper.pullKernelImpl方法,我们可以看到,最终依赖的还是pull方式实现。

到这里可能大家有个问题,这个PullRequest哪里来的呢?我们看到现在并没有出现哪里向阻塞队列里放PullRequest。我们看一下PullMessageServiceexecutePullRequestImmediately方法,在这里找到了put的入口。最终会在pullMessage里找到入口。我们可以看到实际上向队列里放入PullRequest是一个类似定时任务的方式。不停地放,这样我们就可以从server一直不停地拉取消息。这也能解释rocketmq实际上是通过拉的方式来获取消息的