Dubbo源码解析 --- DIRECTORY和ROUTER

Dubbo源码解析 — DIRECTORY和ROUTER

@(dubbo源码分析)


今天看一下DirectoryRouter

我们直接从代码看起(一贯风格),先看后总结,对着总结再来看,相信会收获很多。我们先看com.alibaba.dubbo.config.ReferenceConfigcreateProxy

1
2
3
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}

这里我们之前说过了,根据扩展点,我们知道这里的protocol目前是RegistryProtocol,点进去:

1
return doRefer(cluster, registry, type, url);

这里的cluster同样是扩展点,点进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;

这里出现了第一个东西:DIRECTORY,这里会用clusterjoin方法生成一个InvokerInvoker invoker = cluster.join(directory);,这里的clusterdebug一下,是FailoverCluster,生成的就是FailoverClusterInvoker。这里我们都没有发现这个Directory的作用,先别急,我们至少知道了AbstractClusterInvoker中有一个Directory的实例。

再看我们的AbstractClusterInvokerinvoke方法,这是Dubbo所有集群invoker的入口:

1
2
3
4
5
6
7
8
9
10
11
12
checkWhetherDestroyed();
LoadBalance loadbalance;
//注意这里
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);

这里有个list方法,返回的是invoker集合,doInvoke方法下沉到了子类,之前我们说过,这里会根据负载均衡策略选出一个invoker执行,那么我们看下list方法是如何选取invoker集合的:

1
2
3
//使用directory去选取
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;

点进去进入到AbstractDirectorylist方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<Invoker<T>> invokers = doList(invocation);
//本地的routers
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}

继续看一下doList,这里同样下沉到子类,子类有两个,分别是:RegistryDirectoryStaticDirectory,这两个的区别我们别的文章再说,这里简单提一下:RegistryDirectory中的invoker集合是动态的(实现了NotifyListener接口),而StaticDirectory中的invoker集合是固定的。

那么StaticDirectory返回的集合就是固定的,而RegistryDirectory是动态的,我们在注册中心对invoker做改动,都会引起RegistryDirectoryinvoker集合的变化。

这里我们大概就能理解这个Directory,实际上,这个东西就是Invoker的集合。

doList结束,下面就出现了我们说的Router,默认有三种实现:ConditionRouterMockInvokersSelector(虽然叫Selector)和ScriptRouter。这篇文章先不具体展开讨论每种Router的实现方式。

router返回的并不是一个Invoker,而是invoker集合,我们可以认为,router的作用就是把Directory中符合路由规则的invoker筛选出来,然后在这些invoker中,根据负载均衡策略和集群容错策略执行invoke方法。

总结一下:
Directorinvoker的集合,是AbstractClusterInvoker的属性,可以认为是本集群下的所有Invoker
Router:从Directory中挑选出符合要求的Invoker集合。
LoadBalance:从Router挑选的invoker中再选出其中一个作为最终选择去执行invoke方法。