RocketMQ源码解读——Broker消息写入

RocketMQ源码解读——Broker消息写入


继续上一篇我们说了broker的消息存储过程,这一篇我们看一下消息在broker上以何种形式存储。

消息的存储最终通过CommitLog这个类来做,CommitLogMappedFileQueueMappedFile这三个类抽象了存储过程中的相关概念,对应关系如下:CommitLog: MappedFileQueue :MappedFile = 1 :1 :N

我们在用户目录下可以找到一个叫store的文件夹,反应到文件系统就是:

1
2
3
4
5
6
$ pwd
/Users/xxx/store/commitlog
$ ls -l
total 10485760
-rw-r--r-- 1 xxx staff 107374824 4 21 16:27 0000000000000000000
-rw-r--r-- 1 xxx staff 107374824 4 21 16:27 0000000000107374824

定义如下:

  • MappedFile:00000000000000000000文件
  • MappedFileQueueMappedFile所在的文件夹,对MappedFile进行封装成文件队列,对上层提供可无限使用的文件容量。每个MappedFile统一文件大小,文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB
  • CommitLog:针对MappedFileQueue的封装使用。

消息的格式

CommitLog目前存储在MappedFile有两种内容类型:

  1. MESSAGE:消息
  2. BLANK:文件不足以存储消息时的空白占位符。

结构
消息在文件中的结构 | center

消息的结构
| 第X位 | 字段 | 含义 | 数据类型 | 字节 |
| —— | —— |—— |—— |—— |
| 1 | MsgLen | 消息总长度 | int | 4 |
| 2 | MagicCode | 魔数 | int | 4 |
| 3 | BodyCRC | 消息内容CRC | int | 4 |
| 4 | QueueId | 消息队列编号 | int | 4 |
| 5 | Flag | flag | int | 4 |
| 6 | QueueOffset | 消息队列位置 | long | 8 |
| 7 | PhysicalOffset | 物理位置。在CommitLog的顺序存储位置 | long | 8 |
| 8 | MessageSysFlag | MessageSysFlag类的字段 | int | 4 |
| 9 | BornTimestamp | 生成消息时间戳 | long | 8 |
| 10 | BornHost | 生效消息的地址+端口 | long | 8 |
| 11 | StoreTimestamp | 存储消息时间戳 | long | 8 |
| 12 | StoreHostAddress | 存储消息的地址+端口 | long | 8 |
| 13 | ReconsumeTimes | 重新消费消息次数 | int | 4 |
| 14 | PreparedTransationOffset | | long | 8 |
| 15 | BodyLength + Body | 内容长度 + 内容 | int + bytes | 4 + bodyLength |
| 16 | TopicLength + Topic | Topic长度 + Topic | byte + bytes | 1 + topicLength |
| 17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | short + bytes | 2 + propertiesLength |

BLANK的结构:
| 第X位 | 字段 | 含义 | 数据类型 | 字节 |
| —— | —— |—— |—— |—— |
| 1 | maxBlank | 空白长度 | int | 4 |
| 2 | MagicCode | 空白的魔数 | int | 4 |

代码部分

我们看一下CommitLog.putMessage,先看开头的部分:

1
2
3
4
5
6
7
8
9
10
11
12
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

这里这个UtilAll.crc32(msg.getBody())底层使用java原生的CRC32这个类,这个CRC32hash算法的一种,特点就是速度快

接着看,是处理事务的部分,先忽略。看下一面是获取写入的mappedFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();,进去看看,这个方法很简单就是获取mappedFiles(一个CopyOnWriteArrayList)最后的一个对象。注意这里用了while循环。

然后是获取写入锁后进行写入操作,先看前半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);

//如果mappedFile不存在或者满了,则进行创建
if (null == mappedFile || mappedFile.isFull()) {
//看上去是个get方法,实际上底层会创建
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

这个部分重点就是mappedFile满了或者根本没有,则会创建,如果发现是第一个MappedFile,还会设置把自己设置为创建的第一个文件。

然后看一下mappedFile.appendMessage,这个方法最终会调用AppendMessageCallback.doAppend进行最终的写入。在这里面我们可以看到如上面表格中所指示的,各个字段的含义和数据类型。

appendMessage之后会根据结果来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// 当文件尾时,获取新的映射文件,并进行插入
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

至此消息已经写入到ByteBuffer中,这时候消息还在内存中,我们看一下落盘机制,落盘机制分为两种:同步刷和异步刷

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}

刷盘之后会处理broker的高可用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//如果是同步Master,同步到从节点
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}

至此,broker的消息写入已经全部结束。这篇中遗漏的两个部分包括刷盘策略和数据同步,我们接下来的文章再详细看。

后续补充

我们就用rocketmq的demo试一下,看一下cosumequeue的结构,路径:

1
/Users/xxx/store/consumequeue/TopicTest

这个里面有什么,有我们这个topic下的消费队列,目前只有一个0,意味着队列id为0,进去看一下只有一个二进制文件,名字全是0,打开看看:

1
2
3
0000000    00  00  00  00  00  00  00  00  00  00  00  b2  00  00  00  00
0000010 00 27 a8 07 00 00 00 00 00 00 00 00 00 00 00 00
0000020 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00

这个文件名以已有存储容量递增:00000000000000000000,00000000001073741824,00000000002147483648 …

看一下这个文件的内容,最左边是地址,16进制,不是文件内容。我们看下ConsumeQueue中每个消息的结构:
结构 | center

我们可以看到:

  1. 一条消息20字节。
  2. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量。
  3. Size是消息长度。
  4. Message Tag HashCode是存储消息的tag的哈希值,用于快速定位消息。

这个Consume Queue是消息的逻辑队列,相当于字典的目录(这就是为什么里面会有一个tag的hashcode),用来指定消息在物理文件commit log上的位置。如果TopicTest有两个queue,则:

1
2
TopicTest + queue0 = ConsumerQueue1
TopicTest + queue1 = ConsumerQueue2

写入Consume Queue的关键是两个组件:

  1. FlushConsumeQueueService:flush ConsumeQueue。
  2. ReputMessageService:write ConsumeQueue。

这个ReputMessageService的内容从哪来,就是从我们之前说过的commitLog

1
2
3
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
//do sth....
this.reputFromOffset += size;

这是个无限循环,只要commitlog中有消息,这个任务就会一直把新的消息的相关信息写到consume queue中。具体的细节我们以后细说。