Netty源码分析——EPOLL之epollWait和唤醒

Netty源码分析——EPOLL之epollWait和唤醒

前言

上一篇咱们一起看了eventfdtimerfd,主要就是给这篇做铺垫的,这一章主要是讲解EpollEventLooprun方法中的select过程,这个select指的是我们在最早文章中说的Reactor线程干的三件事之一的select

我们上篇中的eventfd主要作为唤醒epollWait的手段,而timerfd因为其阻塞直到超时的特性,主要用来做超时控制。可能有人会问了,你别欺负我不懂EPOLLEPOLL自带超时控制,参数里可以指定timeout的,怎么还需要借助timerfd来做超时控制呢?

这个问题提前解答一下,timerfd支持纳秒级别,而epoll_wait的参数是毫秒级别。所以这里使用timerfd来控制超时,而epoll_wait参数不是0(没有事件也立即返回)就是-1(表示永远等待)。

源码

先看看EpollEventLoop的初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
// 初始化epoll
this.epollFd = epollFd = Native.newEpollCreate();
// 初始化eventfd
this.eventFd = eventFd = Native.newEventFd();
try {
// 把eventfd交给epoll来监听IO
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
} catch (IOException e) {
}
// 初始化timerfd
this.timerFd = timerFd = Native.newTimerFd();
try {
// 把timerfd也交给epoll来监听IO
Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
}
success = true;
} finally {
}

这里我们可以看到,epoll会监听timerfdeventfd的IO事件,这里很重要,后面我们在底层的系统调用(C语言的代码)中还会看见这两个文件描述符。

继续看run方法,还是万年不变的直接就是个死循环,直到我们shutdown。一开始会计算一个策略,表示最开始执行什么,可以是SELECT或者BUSY_SELECT。规则就是,没有任务就阻塞select(注意我们这里说的虽然是select,但是最底层都是epoll_waitselect表示等待事件),有就非阻塞selectselect到事件就处理,select不到就继续往下走。

我们看一下两种select方式:

1
2
3
4
5
6
case SelectStrategy.BUSY_WAIT:
strategy = epollBusyWait();
break;

case SelectStrategy.SELECT:
strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);

第一种叫epollBusyWait,第二种叫epollWait,注意这里底层是两种不同的调用,我们先看epollWait,因为epollBusyWait只是通过底层指令优化了轮训。

epollWait方法追进去是Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos)

1
2
3
4
5
6
7
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
// 如果返回值小于0就直接抛异常
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;

直接调用native方法,最终会执行到C语言文件中的netty_epoll_native_epollWait0函数。全局搜索即可找到,看一下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;

if (tvSec == 0 && tvNsec == 0) {
// 这里是一个非阻塞的epoll分支,表示select一下然后立即返回
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
// 如果失败,会返回-1, errno将会被设置
// 这里EINTR表示我们的调用被信号打断
} while((err = errno) == EINTR);
} else {
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
// 设置我们的超时,这里第二个参数0表示从当前开始计算,ts.sec秒后超时
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(...);
return -1;
}
}
do {
// 一直等待,直到timerfd超时
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
if (result == 1 && ev[0].data.fd == timerFd) {
uint64_t timerFireCount;
// 我们是ET模式,所以要把里面的值读取走,这样新的数据进来的时候我们才可以得到通知。具体ET的工作方式请查看我之前的文章
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
}
return -err;

这里又上下两个分支,第一个分支比较简单,是立即返回(epoll_wait的最后一个参数是0)。我们主要看的是阻塞的方式,在这里我们使用了timerfd作为超时控制。

我在EpollEventLoop初始化的时候就说了,我们除了初始化epoll的文件描述符之外,还初始化了两个文件描述符,分别是eventfdtimerfd,而且这两个文件描述符都被交给了epoll来监听IO。我们先来看timerfd是如何控制超时的。

设置好超时时间,我们进行了一次result = epoll_wait(efd, ev, len, -1);。这里我们分两个大章节看一下这个部分的逻辑。

C语言源码部分

从这里开始就进入整个控制epoll的核心了。

select到事件分为几种情况:

  1. select到的事件中,全都是socket的IO。result > 0,且当result = 1的时候result == 1 && ev[0].data.fd == timerFd返回的是false。因为都是socket的IO,所以ev中不存在timerfd
  2. select到的事件中,只有timerfd的IO,我们刚刚说了,timerfd到了超时时间,回写一个超时次数到timerfd的文件中,所以这种场景其实就是超时了。result == 1 && ev[0].data.fd == timerFd这时候就返回true,因为只有timerfd超时写了一个数据进去,被epoll监听到了,那么返回的fd当然就是timerfd
  3. select到的事件中,有socket的IO,也有timerfd的IO事件。result == 1 && ev[0].data.fd == timerFd这时候就返回false。因为result至少是2。

第一种,全部都是socket事件,就算result是1(只有一个socket事件),ev[0].data.fd == timerFd返回的也是false,所以根据上面的代码,就直接return了。

第二种,只有timerfd的事件,也就是超时了,这时候进入下面的代码块:

1
2
3
uint64_t timerFireCount;
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;

由于我们已经超时了,所以read会立即返回。注意这里我们要把timerfd中的内容读走,因为我们是ET模式,这样我们才可以收到下次的通知,这里忘记ET工作方式的同学请看一下我以前的关于ET和LT工作方式的文章。

这里我们其实就知道timerfd是如何控制epoll的超时的了。我们用epoll监听timerfd,然后给timerfd设置超时时间,这个超时时间其实就是我们希望epoll阻塞select的阻塞时间。到了时间,就算epoll没有select到其他socket的IO事件,至少也会selecttimerfd的IO事件,也就是说:result = epoll_wait(efd, ev, len, -1);最多只会阻塞超时时间那么长,然后就会被唤醒(并且返回timerfd的描述符)!

这里Netty巧妙利用了timerfd的超时就写入的特点,用epoll监听timerfd来时间超时控制。为什么用timerfd上面说过了,因为timerfd可以控制到纳秒级别,而epoll_wait调用只能控制到毫秒级别。

第三种情况,既有socket事件,且还混有timerfd事件。这里跟第一种一样,会直接返回。

等等,我们刚刚不是说了,需要把timerfd中的事件读取走,如果不读取走,将来就收不到新的通知了,那这里直接返回,下次timerfd就算IO了,epoll监听不到怎么办?如果你能想到这个问题,说明是用心思考了,这个问题会在java部分的源码中找到答案。

java源码部分

我们看一下processReady方法,我们成功返回了result且不为0,就会进入到这里,我们上面说了,第三种情况可能是socket的IO事件和timerfd的事件混在一起,看看怎么处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
// 这里我们后面要说道eventfd的作用时再解释
Native.eventFdRead(fd);
} else if (fd == timerFd.intValue()) {
// 如果socket和timerfd的IO混在一起,我们通过Reactor线程调用一次read
// 这样我们以后还可以在ET下收到timerfd的同志
Native.timerFdRead(fd);
} else {
final long ev = events.events(i);
AbstractEpollChannel ch = channels.get(fd);
if (ch != null) {
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
// 写准备就绪
unsafe.epollOutReady();
}
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
// 读准备就绪
unsafe.epollInReady();
}
if ((ev & Native.EPOLLRDHUP) != 0) {
// 对端关闭
unsafe.epollRdHupReady();
}
} else {
try {
// 如果channel是null,我们就不再需要关心这个channel的事件了
// 这时候我们把这个channel对应的fd从epoll中移除
Native.epollCtlDel(epollFd.intValue(), fd);
} catch (IOException ignore) {
}
}
}
}

谜底解开,我们在上述第三种情况中,也需要把timerfd中的数据读取走,以此来控制我们下次还可以收到timerfd的IO。但是由于我们收到的事件中,混杂了timerfdsocket的IO事件,所以我们在Reactor线程中进行timerfd的读操作。说来说去就是一句话:不管怎么样也要把timerfd中超时的时候写入的数据读走!

我们在代码中还看到了这里:

1
2
3
if (fd == eventFd.intValue()) {
Native.eventFdRead(fd);
}

这里涉及到唤醒流程,我们下一节讲。

再往下就是处理socket的IO事件了,这里我们可以看到我们处理了写和读事件,除此之外还处理了EPOLLRDHUP这个事件。对端正常关闭的时候会触发这个事件(还会触发EPOLLIN)。这个EPOLLRDHUP我理解里算半个读事件,在Netty里我们后面也会说到,处理这个事件的时候,如果channelactive的,就会处理读。

事件处理我们以后再说,这里我们再看看eventfd的作用。

用eventfd来控制epoll的唤醒

eventfd同样在初始化的时候就被交给epoll去监听了。用法我们也说过了,就是一方写,另一方久可以读。这里很多同学可能在看了上面就猜到了,一说读写,我们立即就要反应过来,读写都会被epoll监听到,又因为我们的eventfd是被epoll监听的,那么我们如果向eventfd里写数据,不就可以中断epoll_wait了么,因为epoll_wait这时候至少返回eventfd

我们猜到这一步,就直接看看wakeup方法,因为我们之前在Reactor机制中说,如果有任务进来,我们需要唤醒阻塞select,目的就是防止我们新的任务一直被阻塞没有机会执行,wakeup

1
2
3
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
Native.eventFdWrite(eventFd.intValue(), 1L);
}

果然就是向eventfd中写入一个数据来唤醒epoll_wait。那么写入以后有个问题,我们要时刻记住,Netty的epoll默认使用的是ET,我们写入数据后,为了下次还能收到eventfd的IO通知,必须把旧数据读走,这时候我们结合上一小节的待解决的问题:

1
2
3
if (fd == eventFd.intValue()) {
Native.eventFdRead(fd);
}

不用多说了吧,跟timerfd相同的套路,把数据读取走为了能获取新的数据。

总结

Netty的epollnio都是依托于Reactor模型,当然kqueue也是一样,万年不变的Reactor线程三个任务,包括唤醒逻辑,只不过epoll是依托于eventfdtimerfd,而nio是通过selectorwakeup

总的来说两个底层fd的作用:

  • eventfd:为了能够直接唤醒阻塞select
  • timerfd:为了能够定期唤醒阻塞select

至此整个epollselect就看完了。

关于使用边缘触发

这里我在网上查阅资料的时候,看到几个非常不错的问答以及需要注意的点,这里列出来,为我们理解边缘触发和后面解析读取/写入操作都很有好处,先来一个问答:

Q:在使用边沿触发时,我需要对一个文件描述符持续地read/write直到出现EAGAIN吗?
A:从epoll_wait()收到一个事件则表明这个文件描述符已经准备好做对应的I/O操作了。直到下一次read/write出现EAGAIN之前,你必须认为它是已经准备好了的。至于什么时候和怎样使用这个文件描述符就完全看你自己了。对于基于数据包类型的文件描述符,比如UDPcanonical模式下的terminal,检测read/write的I/O空间是否用尽的唯一方法就是持续地read/write直到出现EAGAIN。对于基于数据流的文件描述符,比如pipeFIFOTCP,还可以用检测向目标文件描述符发送/接收数据的总量的方法可以检测read/write的I/O空间是否用尽。

这里的QA我们主要提取一个内容即可:对于数据包类型的文件描述符,我们需要不断地read/write直到出现EAGAIN,实际上Netty也是这样做的,数据包类型的文件描述符对应EpollDatagramChannel。对于数据类类型的文件描述符,可以用检测向目标文件描述符发送/接收数据的总量的方法可以检测read/write的I/O空间是否用尽,Netty中数据流类型的文件描述符对应AbstractEpollStreamChannel这个类的子类,Netty也是使用读取的Buffer是否读取满来判断是否读完的。

另外说一个边缘触发的饥饿问题:

使用边沿触发模式时的饥饿问题,如果某个socket源源不断地收到非常多的数据,那么在试图读取完所有数据的过程中,有可能会造成其他的socket得不到处理,从而造成饥饿(这个问题不只针对epoll)。解决的办法是为每个已经准备好的描述符维护一个队列,这样程序就可以知道哪些描述符已经准备好了但是还在轮询等待。当然,简单的方法是使用水平触发模式。