Netty源码分析——connect vs active

Netty源码分析——connect vs active

前言

Netty中有几个概念,看了网上的很多文章,给出的解释实在是不太能让人信服,所以今天自己去扒了扒源码,加上debug,基本已经把connect事件和active事件给搞明白了。

不知道大家有没有类似的困惑,Netty中(这里指的是4.1.x版本),有channelActivechannelInactive,分别对应着管道生效和管道失效。然后还有connectdisconnect事件,分别对应着连接和失去连接事件。之前对着两个大类的事件不太明白,debug之。

源码分析

首先,我们先看看这两大类事件分别是哪种类型的事件:

connect类:来自ChannelOutboundHandler,说明是一种outBound事件。

active类:来自ChannelInboundHandler,说明是一种inBound事件。

我在查找二者区别的时候,在stack over flow上也找到了这样的说法:

  • 出站事件(这里指outBound,即connect类事件)是通过Channel.*ChannelHandlerContext.*方法显式触发的事件,例如:“我想关闭频道”。然后,用户可以拦截这些并执行某些操作,例如延迟关闭等。

  • 入站事件(这里指inBound,即active类事件)由transport本身(以及EventLoop)触发,并且通常告诉你发生了某些事情(这可能是因为你请求它或远程对等体这样做)。

如果我只是找了个stack over flow上的说法来糊弄一下,肯定是不会写这篇文章了。我们还是debug一下看看具体的细节。

这里我定义了两个非常简单的InboundHandlerOutboundHandler。分别重写channelActivedisconnect,其中channelActive大概是这样的:

1
2
3
4
5
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
ctx.channel().disconnect();
}

触发active的时候,发一个Netty rocks,然后就执行一个disconnect

disconnect重写之后大概是这样的:

1
2
3
4
5
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("......disconnect.......");
super.disconnect(ctx, promise);
}

无他,就输出了一个disconnect。那么按照我的推断,是不是在active的时候,应该会写一个信息出去,然后执行disconnect,然后触发outBounddisconnect事件,会输出一个......disconnect.......呢?

各位最好尝试一下,尝试过后就会神奇的发现,并没有打印这句话。说实话当时我还懵逼了一下,以为自己的handler没有被加到pipeline里呢。重新检查无误以后,只能进行一下debug了。

跟一下diconnect的实现,会走到:

1
2
3
4
@Override
public final ChannelFuture disconnect() {
return tail.disconnect();
}

点进去会走到这里AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)

1
2
3
4
5
6
7
8
9
10
11
12
13
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 注意,这里在使用tcp的时候,用于返回false,即永远只会执行next.invokeClose(promise)
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
} else {
next.invokeDisconnect(promise);
}
} else {
// sth... 这里省去,其实就是把if里的东西放到executor里执行,这里的executor其实是eventLoop,及reactor
}
return promise;

我在代码中写了注释,重点就在channel().metadata().hasDisconnect()这句,debug发现,channel().metadata()会走到AbstractNioByteChannel#metadata里,这里会返回一个static + final修饰的ChannelMetadata

1
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);

第一个入参的含义是hasDisconnectchannel().metadata().hasDisconnect()其实返回的就是这个hasDisconnect的值,我们看看其含义:

1
2
3
{@code true} if and only if the channel has the {@code disconnect()} operation
that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
again, such as UDP/IP.

我们可以看到,这里只有UDP/IP传输的时候,才可以为true,而且ChannelMetadata中的成员变量hasDisconnectfinal的,在初始化就设置好了且不能更改。注意这个ChannelMetadata是跟管道绑定的,也就是说,所有AbstractNioByteChannel的子类,都是false

那么看看UDP通讯使用的管道NioDatagramChannel

1
private static final ChannelMetadata METADATA = new ChannelMetadata(true);

果然,是true。

说过了这个hasDisconnect,我们说回到AbstractChannelHandlerContext#disconnect,按照我们说的,disconnect操作在TCP通讯的时候就只会执行next.invokeClose(promise);,执行的close,并非disconnect

invokeClose会通过最终传递到HeadContext里,再通过unsafe委托给AbstractChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
if (!promise.setUncancellable()) {
return;
}

// 是否已经有线程在执行close
if (closeInitiated) {
if (closeFuture.isDone()) {
// 如果已经关闭了
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) {
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}

closeInitiated = true;

final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 不再允许写入任何信息
this.outboundBuffer = null;
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
// sth...
} else {
try {
// 真正的进行关闭
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
// 这里我们已经没有信息是在flushing的,所以会返回false
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
// 触发channelInactive,然后进行deregister,把SelectionKey cancel掉
fireChannelInactiveAndDeregister(wasActive);
}
}

这里大部分主要的逻辑都已经在代码里写明了:

  1. 是否有其他线程在进行close。
  2. 执行真正的close逻辑,这里的close会关闭掉原生的java channel
  3. 如果没有数据正在进行flush,触发channelInactive事件。

按照代码,我们虽然执行的是disconnect操作,但是最终会走到close的逻辑里,最终会触发channelInactivechannelUnregistered事件。

这样我们可以得出一个结论,对于TCP来说,channel.disconnect == channel.close,二者都会触发channelInactive事件,但是都不会触发disconnect事件。

实际意义是,有一些要做session管理的应用(比如Dubbo里就做了channel的session管理),我们要在channelInactive的时候移除session,而不要在disconnect里做(永远不会触发)。