Dubbo源码解析——注册中心的容错策略

Dubbo源码解析——注册中心的容错策略

简介

其实之前一直是我遗漏了这个部分的逻辑,没有细看。后来偶然间在某个PR中看到了注册中心中还进行了文件存储相关的操作,就好奇的看了一下这个部分的逻辑。

这里主要的作用是增加注册中心的健壮性,我们需要考虑在某些场景下,注册中心和应用之间的网络抖动对全局的影响:

  • 对于Provider来说,注册失败就直接启动失败,默认情况下,ZookeeperProvider之间的网络抖动我们可以不做处理,默认Dubbo创建的是一个临时节点,会自动通过心跳来探活(很多人都提过ISSUE问过这个问题,知识点)。Provider目前不订阅注册中心上的信息,所以不需要考虑订阅时失败。
  • 对于Consumer来说,默认情况下,注册自己的时候失败就直接返回(比如Consumer和注册中心网络不通)。考虑一下一个Consumer订阅信息失败的时候我们如何处理,直接失败?假如网络抖动导致我们某一次启动Consumer的时候订阅失败,除了定时重试,我们是否还可以有更优秀的做法来保证程序的健壮。

这就是关于注册中心的容错策略。具体的做法就是,每次注册中心上,Provider变更的消息通知到本地的时候(目前Dubbo默认情况下还是全量通知),我们把最新的消息保留下来,当我们在重启Consumer的时候,或者Consumer和注册中心之间的网络抖动的时候,我们至少可以把最近的Provider信息告知Consumer,让Consumer先尝试连接这些Provider,而不是直接让Consumer重启失败。

需要注意的是,容错的机制主要发生在订阅阶段,所以我们下面主要关注订阅失败相关的处理。

通知流程

先看我们的启动流程,注册中心启动的时候,会load我们已有的cache文件(cache文件就是我们说的最近一次的Provider的相关数据,比如URL),具体的代码在AbstractRegistry的构造函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
setUrl(url);
// 默认是异步的存储cache文件
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 存储的地址,/home/xxx/.dubbo/...
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");

// 我们的cache file
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// 加载已有的配置
loadProperties();
notify(url.getBackupUrls());

根据我们之前分析的流程,我们需要关注每次Provider变更通知到本地的时候,我们是如何处理的,AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 这里的key是category,值是对应的URL
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> {
return new ArrayList<>();
});
categoryList.add(u);
}
}

Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, k -> {
return new ConcurrentHashMap<>();
});
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把这些数据放到notified这个全局的变量里
categoryNotified.put(category, categoryList);
// 保存cache文件
saveProperties(url);
listener.notify(categoryList);
}

这里我们可以看到,最新的ProviderURL到达Consumer的时候,我们会把它存到notified这个全局变量里,然后通知listener

看下这个saveProperties方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
StringBuilder buf = new StringBuilder();
// 把notified里面的数据拿出来
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
// 键是某个接口,比如com.abc.DemoService,这里还会有group和version信息
// 值就是对应的所有url,用空格拼接起来
properties.setProperty(url.getServiceKey(), buf.toString());
// 锁,只保存最新的数据
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
doSaveProperties(version);
} else {
// 默认异步保存
registryCacheExecutor.execute(new SaveProperties(version));
}

注意这里我们只保存最新的一份数据,所以用一个version来控制。看看这个doSaveProperties方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
if (version < lastCacheChanged.get()) {
return;
}
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
try {
FileChannel channel = raf.getChannel();
try {
// 这里用锁来控制并发写
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("...");
}
try {
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream outputFile = new FileOutputStream(file);
try {
properties.store(outputFile, "Dubbo Registry Cache");
} finally {
outputFile.close();
}
} finally {
lock.release();
}
} finally {
channel.close();
}
} finally {
raf.close();
}

总结一下整个流程就是:

  1. 最新的URL(全量的,之前已经说过)到达。
  2. 数据保存到全局变量中(notified)。
  3. 异步的保存到文件中,即使注册中心重启也不会丢失。

订阅过程的容错

看下这个方法FailbackRegistry#subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 这里是订阅过程抛出异常时的处理
Throwable t = e;
List<URL> urls = getCacheUrls(url);
// getCacheUrls就是从notified这个全局变量里把相关的数据拉出来
// 由于我们每次启动注册中心的时候都会加载cache文件,并且每次新的数据到达Consumer的时候
// 也会更新notified,所以这里其实是取出最新一次的Provider的URL副本。
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error(...);
} else {
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException(...);
} else {
logger.error(...);
}
}
// 添加一个任务进行定时重试
addFailedSubscribed(url, listener);

总结一下订阅的流程也很简单:

  1. 如果出错了,获取最新的Provider端URL副本。
  2. 通知给监听器。(只通知需要订阅这个监听器,并非所有的)
  3. 添加一个任务定时重新订阅,重试成功时,这时可能Consumer还维护的是ProviderURL的副本,可能是过期的,当重试任务成功后,会把最新的数据更新过来。

至此我们就完成了注册中心的容错流程分析。这里并不复杂,但是很容易遗漏,网上关于这个地方的分析不是很多,这里拿出来跟大家分享一下。