Netty源码分析——编/解码

Netty源码分析——编/解码

前言

这一篇看一下Netty的编码和解码的工作原理。编码解码器其实都是一种特殊的Handler,既然是Handler,那就有inBoundoutBound的区别。

我们这篇主要是梳理一下,从入站解码到业务处理,再到出站编码的过程,让大家对编解码的流程有个了解。

Decoder

我们随便挑选一种Decoder看一下实现就可以了,比较简单。这篇的内容相比之前的都简单一些、轻松一些。

ByteToMessageDecoder看一下,首先这个Decoder继承自ChannelInboundHandlerAdapter,这个类其实是一个InBoundHandler,实际上处理的是读事件。

那么看看这个Decoder是怎么把数据转成我们想要的类对象的。先看下channelRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 只能处理ByteBuf
if (msg instanceof ByteBuf) {
// 这是一个继承自ArrayList的数据结构
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
//第一次解码,cumulation是之前没解码完成的数据
cumulation = data;
} else {
//把这次的数据加到cumulation里等待被解码
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//真正解码的部分
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
// 如果字节容器当前已无数据可读取,直接销毁字节容器,并且标注一下当前字节容器一次数据也没读取
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// channelRead执行了如果大于16次并且依然可读,这时候强制触发丢掉一些已读的数据,防止出现OOM
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//最后还是会执行一次channelRead把out里的数据向后传递
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}

这里的步骤拆解一下。首先,ByteToMessageDecoder只能处理ByteBuf。然后创建一个CodecOutputList这个数据结构。这里这个结构其实就是一个ArrayList,但是其中的所有方法都重写了。区别不是很大,但是这个类有一个getUnsafe方法,传入的index不会被校验。这里这个数据结构主要是为了提升性能。

然后把数据放到cumulation里。这其实是一个ByteBuf,保存的是所有还没被解码的数据。

然后进行真正的解码操作。这里需要注意finally里的一段代码,是避免OOM的,这说的是一种场景:如果这个cumulation一直有数据可以读,且channelRead被调用了超过16次,就强行执行一次discardSomeReadBytes,把已经读取过的数据扔掉,防止OOM,主要是避免了读取操作老是不结束,已经读取过的字节又一直存放在ByteBuf中的情况。

我们看下callDecode方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
while (in.isReadable()) {
// 记录一下out里的数据数量
int outSize = out.size();
// 一开始out里就有数据,优先向后传递解出来的数据
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}

// 记录一下字节容器中有多少字节待拆
int oldInputLength = in.readableBytes();
//调用decode方法,被解码的数据放到out里
decodeRemovalReentryProtection(ctx, in, out);
// 如果这个节点被移除了,就不继续操作数据了
if (ctx.isRemoved()) {
break;
}

// 如果out中的数量没有变化
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
// 如果拆包器未读取任何数据,可能数据还不够业务拆包器处理,直接break等待新的数据
break;
} else {
// 拆包器已读取部分数据,还需要继续解码
continue;
}
}

// 如果发现已经解到了数据包,但是发现并没有读取任何数据,这个时候就会抛出一个DecoderException,告诉你什么数据都没读取,但是Decode出了一个业务数据包,这是有问题的
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(“...");
}
if (isSingleDecode()) {
break;
}
}

如果out里本来就有数据,就触发channelRead,把数据向后传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
// 这里的方法就是下面的fireChannelRead
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}

//把out中的数据都向后传递
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}

注意这里是把out里的每个数据都向后传递。

另外一个细节注意一下这里第二个fireChannelRead用的是CodecOutputList.getUnsafe,这个Unsafe方法的入参就是数组下标,跟ArrayList一样,但是不会校验下标的合法性,主要是为了提升性能。

另外注意我们会在channelReadfinally块中,不管怎样都会触发一次fireChannelRead

然后下面的逻辑是先看看有没有解析出数据包,如果没有,看看是不是从in中读取过数据,如果读取过,就继续解码,如果没有读取过数据,说明可能我们in中的数据不够我们读取的,这时候就继续读操作(终止解码操作)。

如果解析出了数据包,但是却没读取数据,抛出一个异常告诉你,你解析出了一个数据包,但是没从in中读取数据,这是有问题的,举个例子:

1
2
3
4
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 直接往out中增加数据,而不从in中读取数据
out.add(new Integer);
}

Encoder

同样选MessageToByteEncoder,实际上是一个outBoundHandler。看write方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
// 出站msg是否能够被encoder处理。。
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 编码
encode(ctx, cast, buf);
} finally {
//编码之后释放之前的msg
ReferenceCountUtil.release(cast);
}

if (buf.isReadable()) {
// 向后传递
ctx.write(buf, promise);
} else {
// 如果不可读,则向后传递一个buffer
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 不能处理直接向后传递
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 释放一下
if (buf != null) {
buf.release();
}
}

这里比较简单了。没有很多细节,看上面基本能够看懂,就不细说了。

清理字节容器

业务拆包完成之后,只是从字节容器中取走了数据,但是这部分空间对于字节容器来说依然保留着。如果不对字节容器做清理,那么时间一长就会OOM。

这里我们要重新看回到channelRead方法里,在finally中有这么一段代码:

1
2
3
4
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();

fireChannelRead方法干什么的我们已经说过了,这里看看decodeWasNull这个成员变量。这是一个boolean值,记录的是到fireChannelRead为止,是否已经成功解码了一个数据包。

看下CodecOutputList#insertSinceRecycled方法:return insertSinceRecycled;,再追一下这个insertSinceRecycled成员变量的位置,我们可以看到只有在CodecOutputList#insert方法中被设置为了true,insert方法的调用方在addset方法中,即:如果向
CodecOutputList中放入数据,则会设置insertSinceRecycled为true,当CodecOutputList被回收的时候,insertSinceRecycled被重置为false。

那么decodeWasNull = !out.insertSinceRecycled();这句的意思就很明白了,如果这个解码器目前连一个数据包都没解(这里指Decode操作)出来,则decodeWasNull设置为true。

我们再看下清理逻辑,ByteToMessageDecoder会在channelReadComplete的时候,进行清理字节容器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
numReads = 0;
// 执行清理逻辑
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();

protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
// 清理已读的数据
cumulation.discardSomeReadBytes();
}
}

discardSomeReadBytes操作之前,cumulation中的字节分布情况:

readed unread writable

之后,字节分布情况:

unread writable

实际上就是把已读的字节扔掉。

这里注意一个细节。我们之前花了一些篇幅说了decodeWasNull的含义和赋值过程,也是为这个细节做铺垫。之前说过decodeWasNull的意思是说,当前解码器到读事件结束,仍然没有解(指代Decode操作)出一个数据包,这个时候,即使该channel的设置为非自动读取(非自动读取的部分,可以看下我的另外一篇文章AUTOREAD),也会触发一次读取操作ctx.read()。这个操作会向selector中注册一个READ,以便我们下次读可以读取一个完整的数据包:

1
2
3
4
5
6
7
if (decodeWasNull) {
decodeWasNull = false;
// 即使管道关闭了自动读,依然触发一次读操作
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}

done

内容至此,其实我们已经可以明白编/解码的逻辑了,其实这中间还有一些拆包粘包的内容,比如我们的解码器,里面除了解码,其实还有一个组装数据包的逻辑,对TCP的拆包粘包有了解的同学,看过这些内容应该可以比较快速的理解Netty对于拆包粘包的内容了~