Dubbo源码解析——优雅停机

Dubbo源码解析——优雅停机

前言

今天来看看Dubbo的优雅停机,Dubbo的主要环境还是在Spring容器中,我们主要看看在Spring环境下的优雅停机。代码主要分析Provider端和Consumer端。

Provider

先看下Dubbo的DubboApplicationContextInitializer,这个类实现了springApplicationContextInitializer接口:

1
2
3
4
5
6
public class DubboApplicationContextInitializer implements ApplicationContextInitializer {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
applicationContext.addApplicationListener(new DubboApplicationListener());
}
}

很简单的注册了一个DubboApplicationListener,继续看看这个Listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class DubboApplicationListener implements ApplicationListener<ApplicationEvent> {

private DubboBootstrap dubboBootstrap;

public DubboApplicationListener() {
dubboBootstrap = new DubboBootstrap(false);
}

public DubboApplicationListener(DubboBootstrap dubboBootstrap) {
this.dubboBootstrap = dubboBootstrap;
}

@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
if (applicationEvent instanceof ContextRefreshedEvent) {
// 初始化或刷新ApplicationContext时引发的事件。
dubboBootstrap.start();
} else if (applicationEvent instanceof ContextClosedEvent) {
// 关闭时触发
dubboBootstrap.stop();
}
}
}

也非常简单,无非就是在初始化的时候启动一下DubboBootstrap,结束的时候关闭一下。我们主要研究的是停机,这里看下stop

1
2
3
4
5
6
7
8
9
10
11
public void stop() {
for (ServiceConfig serviceConfig: serviceConfigList) {
// 这里是个坑
serviceConfig.unexport();
}
// 这里才是主要逻辑
shutdownHook.destroyAll();
if (registerShutdownHookOnStart) {
removeShutdownHook();
}
}

这里可能大家会先去看serviceConfig.unexport();这里,但是我可以告诉你这个serviceConfig目前任何情况下都是空的,因为DubboBootstrap#registerServiceConfig全局没有被调用。这里不要看,直接看shutdownHook.destroyAll

1
2
3
4
5
6
7
8
9
public void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
// 关闭所有的注册中心
AbstractRegistryFactory.destroyAll();
// 关闭所有的server相关组件
destroyProtocols();
}

先看关闭注册中心的过程:

1
2
3
4
5
6
7
8
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();

继续看AbstractRegistry#destroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 这里把所有已经注册的url先从zk上删除掉
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
} catch (Throwable t) {
}
}
}
}

// 把对应的监听器也都移除掉,并且不再订阅/dubbo/XXX/configurators相关的内容
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
} catch (Throwable t) {
}
}
}
}

注册中心销毁的时候主要做两件事:

  1. 把注册了的URL从ZK上删除。
  2. 移除相应的监听器,并且不再监听那些相关configurators的变化。

那么看到这里,我们基本上就知道后面的操作了,后面就该销毁那些Invoker了(并且关闭通道)。需要注意的是这个顺序很重要,如果先销毁通道会怎样——这个就是我们说的优雅停机中优雅的含义——先销毁通道是不优雅的,举个例子:如果先关闭TCP通道,但是这时候ZK上的注册信息还在,仍然可能有一部分流量打到当前这台机器上,导致请求断路了。

我们继续看看销毁Invoker的操作。回到DubboShutdownHook#destroyProtocols里:

1
2
3
4
5
6
7
8
9
10
11
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
for (String protocolName : loader.getLoadedExtensions()) {
try {
Protocol protocol = loader.getLoadedExtension(protocolName);
if (protocol != null) {
protocol.destroy();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}

先看看看AbstractProtocoldestroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
invokers.remove(invoker);
try {
invoker.destroy();
} catch (Throwable t) {
}
}
}
for (String key : new ArrayList<String>(exporterMap.keySet())) {
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
try {
exporter.unexport();
} catch (Throwable t) {
}
}
}

注意这里的exporter其实也不是DestroyableExporter,可以认为这个DestroyableExporter同样没有被使用。

抽象类里做的就是把invokerdestroy掉,然后把暴露出去的服务unexport掉。以Dubbo protocol为例,看下destroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
// 获取关闭的等待时间
server.close(ConfigUtils.getServerShutdownTimeout());
} catch (Throwable t) {
}
}
}

for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
client.close(ConfigUtils.getServerShutdownTimeout());
} catch (Throwable t) {
}
}
}

for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
try {
client.close(ConfigUtils.getServerShutdownTimeout());
} catch (Throwable t) {
}
}
}
stubServiceMethodsMap.clear();
super.destroy();

我们发现无论server还是client,在close的时候会传入一个参数:ConfigUtils.getServerShutdownTimeout(),先看server关闭的时候HeaderExchangeServer#close(int)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 打个标识,表明server正在关闭
startClose();
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
sendChannelReadOnlyEvent();
}
// 睡眠timeout
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
// 关闭(这里主要是停止心跳包的发送)
doClose();
// 关闭通道和Dubbo的分发线程池
server.close(timeout);

我们看看AbstractServer#close(int)

1
2
3
4
// 关闭分发线程池
ExecutorUtil.gracefulShutdown(executor, timeout);
// 关闭通道(比如TCP通道)
close();

这里这个executor是WrappedChannelHandler#executor,是做分发使用的,但是Dubbo用了一个不太优雅的方式(DataStore)把二者个关联起来了,这里不做讨论,我们就直接认为这个executor就是分发线程池即可。继续看ExecutorUtil.gracefulShutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 如果已经终止,就不做任何处理
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// 关闭以禁止提交新的任务
es.shutdown();
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
try {
// 等待timeout,让还没处理完的任务尽可能处理完
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
if (!isTerminated(es)) {
// 如果还是没有关闭,那么久开启一个新的线程去关闭
newThreadToCloseExecutor(es);
}

我们看完了这些代码再来理解一下优雅停机的优雅。为什么需要睡一会,让线程池等一会再结束,其实很简单,因为线程池里可能还有一些任务没有执行完成。对于服务端来说,是没有超时的概念的,在作为服务端不知道消费者端是否已经超时的情况下,我们关闭的时候还是应该尽可能的把没处理完的任务给处理掉再关闭。

Consumer

说完了服务端,我们再来看看Consumer。入口是一样的,只不过在DubboProtocol#destroy的时候,我们关注的是client.close(ConfigUtils.getServerShutdownTimeout());罢了:

1
2
3
4
5
6
7
8
if (refenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
client = replaceWithLazyClient();
}

注意这里有一个逻辑是我之前的文章中说过的,就是ReferenceCountExchangeClient关闭的条件是所有引用该clientinvoker都关闭才能关闭。继续看client.close(timeout);

1
2
3
4
5
6
// 打表示
startClose();
// 关闭心跳
doClose();
// 关闭管道
channel.close(timeout);

这里的channelHeaderExchangeChannel,入参同样是一个timeout,表示优雅停机的等待时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (closed) {
return;
}
closed = true;
if (timeout > 0) {
long start = System.currentTimeMillis();
// 这里DefaultFuture.hasFuture(channel)是说如果管道里还有没有处理完的future,就睡眠。
while (DefaultFuture.hasFuture(channel)
&& System.currentTimeMillis() - start < timeout) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
// 关闭通道(比如TCP通道)
close();

这里我们需要注意的是,这里又两个timeout,我们要把他们区分开。

  1. 优雅关机的timeout,指优雅关机最多等待多久,是入参。
  2. 请求的timeout,指一次rpc请求最多等待多久,这里没有直接体现出来,会影响DefaultFuture.hasFuture(channel)这个的效果。

我们可以这样概括管道不会立即关闭而是“优雅”的关闭:尽可能的等待没结束的请求返回,除非等待的时间超过了优雅关机的timeout,则立即关闭。

Channel inactive

说完了正常的优雅关闭,这里提一句不正常情况下的优雅关闭。这个功能是由笔者为Dubbo提供的,也算是另外一种优雅,所以这里一起说一下。

试想一种场景:消费者端发出去的请求还没有被返回的时候,服务端已经关闭(TCP通道断)了,这个时候作为消费者,之前的做法是等待这个没有返回的请求直到超时。这种情况下很明显是不优雅的,更加优雅的做法应该是感知到管道关闭,直接就把这些永远不会返回的请求给立即返回,而不是等待到超时。

目前这个功能已经在新版本中发布了。我已经写了很多篇关于Dubbo解析的文章,这里给大家留个作业,有兴趣的同学可以去研究一下这个是如何实现的,其实非常非常简单,对Netty的各种事件有一些理解的同学,应该可以轻松的找到答案。