Netty源码分析——AUTOREAD

Netty源码分析——AUTOREAD

前言

这个是设置在Channel上的一个属性,主要控制管道的自动读。可能对于很多初学者来说,都对这个Channel的自动读表示困惑,这篇文章主要来看下这个自动读如何工作以及什么时候关闭自动读(自动读默认开启)。

上代码

这个要追溯到我们的读操作了,我们以服务端为例,服务端先是有个叫Boss的Reacotr线程,不断的轮训ACCEPT事件,如果轮训到,就创建一个Channel并且注册READ事件,把这个Channel分配给一个叫WorkerReactor线程。这个逻辑不说了,详细的内容看之前的:Boss和Worker

分配给WorkerChannel,在NIO模式下是NioSocketChannel,我们看下它的读取操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// sth...
try {
do {
byteBuf = allocHandle.allocate(allocator);
//真正的把数据读取到ByteBuf里
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
// 触发管道的读操作,处理数据
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 读操作是否结束
} while (allocHandle.continueReading());

allocHandle.readComplete();
// 这里触发读完成操作
pipeline.fireChannelReadComplete();

if (close) {
closeOnRead(pipeline);
}
}
// sth...

我们讲过之前的大部分步骤,这里看这个pipeline.fireChannelReadComplete。这里会从HeadContext节点开始向后传递,看下HeadContext#channelReadComplete

1
2
3
4
5
6
7
8
ctx.fireChannelReadComplete();
readIfIsAutoRead();

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

这里看这个readIfIsAutoRead,这里出现了自动读类似的字眼。具体的方法内容也很简单,如果Channel设置了自动读,就执行channel.read()。这个方法我们之前也讲过了,最终传递给tail,再从tail传递给head,然后执行unsafe.beginRead(),最终是给这个NioSocketChannel注册一个READ事件。

注册以后,这个Channel就可以继续在一个Worker Reactor线程里,继续做读操作。

注意,这个管道能够在一次读操作之后继续读的要求,就算我们的主题——AUTOREAD,关于如何设置自动读,就不细说了。那我们到此就可以推断出这个自动读的作用:在第一次读操作结束之后,是否还进行读操作。这里需要注意,如果这个NioSocketChannel第一次被注册到Worker上,就算这个管道被设置为非自动读,也是会进行一次读操作的,换句话说,每个管道都会至少进行一次读(除非客户端就根本没东西写给服务端,那么当然就不会进行读操作)。

作用和使用场景

说完了原理,我们说下什么时候把这个自动读关掉,已经什么时候重新开启。

这个其实是做流控用的。举个例子,我们服务端有个线程池,固定大小500个线程。这时候我们可能有很多的客户端链接,一下子把线程池撑满了,这时候我们可以关掉一部分管道的自动读。

以上场景可能不是非常恰当,我可以在管道中设置一个流控Handler。这个Handler每次读到数据的时候,就看看线程池的size,如果超过某个值,我们就关闭这个管道的自动读:channel.config().setAutoRead(false)。然后继续传递。等到这个管道读结束了,就不会再出发下次读了,这样当然也不会占用我们的线程池了。

这里我们注意一个问题,我们不从Channel里读数据,并不代表这个Channel关闭了。我们可以用一个定时任务检测我们的线程池,如果低于某个值,我们调用Channelchannel.config().setAutoRead(true)即可开启自动读。

这里可能各位有个问题,之前我们也说了,调用readIfIsAutoRead(或者说fireChannelReadComplete)是在一次读结束,但是之前我们设置了禁止自动读,那么自然也没人来执行ctx.fireChannelReadComplete了,这时候我把Channel的自动读打开也没用啊,因为没人能触发readIfIsAutoRead给这个Channel注册自动读了。

这里还是要追一下代码,看下DefaultChannelConfig#setAutoRead

1
2
3
4
5
6
7
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
channel.read();
} else if (!autoRead && oldAutoRead) {
autoReadCleared();
}
return this;

这里我们看到,如果把管道的自动读设置为true的时候,是会主动调用一次channel.read()来进行READ事件的注册的。

注意

但是使用自动读也要注意一件事情。

自动读如果关闭后,对端发送FIN的时候,接收端应用层也是感知不到的。这样带来一个后果就是对端发送了FIN,然后内核将这个socket的状态变成CLOSE_WAIT。但是因为应用层感知不到,所以应用层一直没有调用close。这样的socket就会长期处于CLOSE_WAIT状态。特别是一些使用连接池的应用,如果将连接归还给连接池后,一定要记着自动读一定是打开的。不然就会有大量的连接处于CLOSE_WAIT状态。

说白了就是,如果服务端关闭自动读,但是关闭之后内核中的socket收到了客户端传来的关闭命令,这时候应用层没有从Channel中读取数据,自然也就不知道客户端已经要求关闭了。这里要特别注意。

后记

之前在读代码的时候也有个疑问,在AbstractNioByteChannel.NioByteUnsafe#read方法中是这样进行读操作的:

1
2
3
4
5
do{

// ...

} while (allocHandle.continueReading());

这里分析一下,我截取一部分有用的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 这里的allocHandle是AdaptiveRecvByteBufAllocator.HandleImpl
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

// 重置所有计数器,主要是重置已读字节、每次读最多获取多少次消息和已读消息
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配ByteBuf
byteBuf = allocHandle.allocate(allocator);
// allocHandle记录上次读了多少字节
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// allocHandle.lastBytesRead() < 0;说明我们收到了一个EOF,直接结束
readPending = false;
}
break;
}

// 增加已读消息次数
allocHandle.incMessagesRead(1);
readPending = false;
// 触发读事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 是否继续读
} while (allocHandle.continueReading());

readPending = false;
// 触发读结束
allocHandle.readComplete();
} catch (Throwable t) {
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}

我会分析两个功能的代码:

  • 关于allocHandle.continueReading如何判断读操作结束。
  • 具体解释一下readPending这个标识的作用。

先看第一个。allocHandle.reset操作中,设置了一个maxMessagesPerRead,这个用来标识一次读取操作最多能读取多少次消息。注意这里的单位是次数不少字节数,我们每次pipeline.fireChannelRead(byteBuf)前都会增加一次读取的消息的次数。这个值默认是16,具体的设置这个值的链路我就不展开说了,源头在AbstractNioByteChannel#METADATA这个成员。

另外说一下RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();这里这个allocHandle默认是AdaptiveRecvByteBufAllocator.HandleImpl,也就是说默认的AllocatorAdaptiveRecvByteBufAllocator,这个源头在:

1
2
3
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}

说完这些我们看下如何确定读操作可以继续。按照上述代码来看,先是重置计数器,然后分配ByteBuf这两步没有什么特殊的。allocHandle.lastBytesRead(doReadBytes(byteBuf))继续看到这里,doReadBytes会返回实际读取的字节数,我们看下实现:

1
2
3
4
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 记录视图读取的字节数,这个数等于分配出来的ByteBuf的可以写入的字节数(注意目前ByteBuf是空的)
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());

这里会记录这次读取试图读取的字节数,也就是ByteBuf的容量,记录这个有什么用我们后面继续看。

看下lastBytesRead方法:

1
2
3
4
5
6
7
8
9
10
if (bytes == attemptedBytesRead()) {
record(bytes);
}
super.lastBytesRead(bytes);

// super.lastBytesRead(bytes) 实现:
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}

这里先去判断是否这次读取的字节数等于试图读取的字节数。这句有注释,如果我们读取的字节数正好就是我们试图读的字节数,这时候记录一下,将来调整ByteBuf的时候,就可以根据之前的记录调整ByteBuf的大小。

这个解释有点绕,举个生活化的例子:加入有个水桶,这就好比我们的socket,这个水桶里的水我们都不知道,我们现在用杯子从里面取(这就是读取操作),第一次我们用1000毫升的水杯取,取完水杯里就500毫升,说明水桶里的水就500毫升。那么如果我们如果取了1000毫升出来,说明两种情况:

  1. 水桶里就1000毫升水,我们一次恰恰好好取完了,水桶空了。
  2. 水桶里不止1000毫升,我们一次没取完。

这个时候我们怎么办呢,我们一般下次取的时候会换个2000毫升的更大的水杯去取。为什么要这么做,因为对于程序而言,性能更好的方式是每次尽可能多的读取字节,减少读取次数。说到这里我们基本也可以猜到netty判断读操作是否结束的条件之一就是:我们本次实际读的数据量是不是本次试图读的数据量。我们把它bring back到刚才的例子上就是:我们本次用杯子取的水的量是不是就是杯子的容量。

继续看我们读操作:

1
2
3
4
5
6
7
8
9
10
11
12
// 读取的字节数小于等于0,分两种,等于0和小于0
if (allocHandle.lastBytesRead() <= 0) {
// 释放ByteBuf
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// 我们读取到了EOF,没有东西可以读了,结束本次读操作
readPending = false;
}
break;
}

这是一种本次读取没读到数据或者EOF了,就结束本次读。否则往下进行,执行allocHandle.incMessagesRead(1);,增加一次读取次数,我们之前说过,默认情况下最多一次读取操作读取16次数据。那么我们又可以推断出另一个判断读取结束的条件了:已经进行的读数据次数是否超过16(16是默认值)。我们现在看一下continueReading方法,重头戏,最终会走到这里:

1
2
3
4
5
6
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}

maybeMoreDataSupplier.get()用的是attemptedBytesRead == lastBytesRead;,我们把条件拆开看:

  1. 是否配置了自动读,如果没有配置自动读,结束这次读操作,可以看到autoRead如果设置为false,也至少会进行一次读。
  2. respectMaybeMoreData目前都是true,我们先不看。看maybeMoreDataSupplier.get(),其实就是attemptedBytesRead == lastBytesRead;,就是我们刚刚说的,如果试图读取的数据等于上次读取的数据,如果等于,说明我们的水杯(ByteBuf)在这次取(读)操作中取满了,那么猜测可能还有数据没读完,就继续读。
  3. totalMessages < maxMessagePerRead,之前说过了,默认情况下读取不能超过16次。
  4. totalBytesRead > 0,这个totalBytesRead表示总共读取的字节数,必须大于0才继续读,等于0表示一共读取的数据都小于等于0,就不读了。

ReadPending

继续看下个细节,关于readPending标识符的作用。在读结束的finally里,我们可以看到:

1
2
3
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}

如果没有readPending并且不需要自动读,就移除读的key,下次select就不会select到这个Channel

autoRead上面已经说了,这里主要看readPending为了解决什么问题。这里要往上看读操作中pipeline.fireChannelReadComplete();pipeline.fireChannelRead(byteBuf);。我们存在一些情况下,需要在channelReadCompletechannelRead中操作channel.read()方法,channel.read()会给管道增加读的key,让reactor线程能够取出这个channel做读操作。那么问题就来了,因为removeReadOp是在finally中,所以我们在channelReadCompletechannelRead中进行的channel.read()都将失效。我们为了让它不失效,增加了一个readPending,标识现在有一个读操作待操作,不能移除读的key。

我们看看什么时候这个readPending会被设置为true,调用channel.read()会最终委托到pipeline最后传递给HeadContext,这是老的知识点,最终会进行unsafe.beginRead();,最后再AbstractNioChannel#doBeginRead中设置读的key。这个方法里面就会设置readPending = true;。这样,我们如果在channelReadComplete中进行channel.read(),就会被这个readPending设置为true,这样在finally中就不会移除读的key了。

那么我们什么时候设置readPendingfalse呢,是在所有读操作结束的时候。readPending指的是否有待处理的读操作,所以读完一次数据,我们就设置一次readPending=false。当然我们shutdown之类的时候也会设置这个值为false,这里不展开了,有兴趣的可以去看看代码。