Dubbo源码解析——connections vs share connection

Dubbo源码解析——connections vs share connection

前言

先说明一下connectionsshare connections的区别,这里先看下connections的使用方式和官方文档:

1
<dubbo:reference interface="com.foo.BarService" connections="10" />

这种方式可以限制客户端服务使用连接不能超过10个。而默认不配置的情况,就是共享一个连接,但是共享连接存在一个问题,比如两个Invoker贡献了一个连接,只有Invoker1关闭的时候是不能直接关闭连接的,只有在两个Invoker都关闭的时候才可以关闭连接。

connections

这里connections意味限制连接个数,而share connection意味共享连接(也可以认为单个连接),后面不再说明。

先看下connections的实现。入口在DubboProtocol#refer

1
2
3
4
5
optimizeSerialization(url);
// getClients是根据url生成真正client的地方
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;

继续追一下getClients

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果connections不配置为0,开启共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 创建一个共享连接
clients[i] = getSharedClient(url);
} else {
// 初始化每一个连接
clients[i] = initClient(url);
}
}
return clients;

先看initClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// sth...

ExchangeClient client;
try {
// 懒加载,当真正发生请求的时候才进行连接
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("...");
}
return client;

这里主要是根据是否懒加载创建连接,具体的创建连接的操作这里不细说了。我们可以看到,如果配置了connections(大于0),则会直接创建connectionsclient,以此来控制客户端的连接数量。

share connection

继续看戏分享链接的地方getSharedClient,这里说明一下,如果是分享链接,那么clients.length = 1,只会进行一次循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
String key = url.getAddress();
// 根据server地址获取一个client
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// 如果client没有关闭,就增加计数。计数的作用下面会说
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}

// 加锁控制
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
// initClient的作用以及说过了,初始化一个连接
ExchangeClient exchangeClient = initClient(url);
// 用ReferenceCountExchangeClient在client外面再包一层
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}

这里并非是一个Service使用一个共享连接(因为keyProvideraddress),而是一个Consumer和一个Provider之间,默认情况下,所有的服务都共享一个连接。举个例子,比如Provider暴露了DemoService1DemoService2DemoService3Consumer端引用了这三个服务,这时候三个服务的会共用同一条TCP通道。

这里我最早的理解也是错误的,我最早的理解是,如果Consumer引用了DemoService1两次,则两个DemoService1共用一个连接,DemoService2DemoService3并不能使用DemoService1的连接。

继续往下看这个ReferenceCountExchangeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final class ReferenceCountExchangeClient implements ExchangeClient {

private final URL url;
// 计数
private final AtomicInteger refenceCount = new AtomicInteger(0);
// 维护以及close掉的client
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
private ExchangeClient client;

public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 创建的时候直接计数+1
refenceCount.incrementAndGet();
this.url = client.getUrl();
if (ghostClientMap == null) {
throw new IllegalStateException(...);
}
this.ghostClientMap = ghostClientMap;
}

// sth...
}

可以看到,这个ReferenceCountExchangeClient只是包装了一下真正的client,里面维护了一个refenceCount是计数,我们之前说过,如果共享连接,那么只有当这个连接的所有使用者都关闭后,才可以关闭这个连接,我们看下其close方法:

1
2
3
4
5
6
7
8
9
10
if (refenceCount.decrementAndGet() <= 0) {
// 只有减到0之后,才进行真的close
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
// close之后使用懒加载的方式替代
client = replaceWithLazyClient();
}

这里就实现了所有使用方都关闭的情况下才关闭连接的功能。继续看下replaceWithLazyClient,这个方法的功能是将关闭之后的连接变成一个懒加载的client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE)
// 关闭自动reconnect
.addParameter(Constants.RECONNECT_KEY, Boolean.FALSE)
.addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString())
.addParameter("warning", Boolean.TRUE.toString())
.addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
.addParameter("_client_memo", "referencecounthandler.replacewithlazyclient");

String key = url.getAddress();
LazyConnectExchangeClient gclient = ghostClientMap.get(key);
if (gclient == null || gclient.isClosed()) {
// 把这个连接改为懒加载的形式维护起来
gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
ghostClientMap.put(key, gclient);
}
return gclient;

这里就是把ReferenceCountExchangeClient维护的client包装成LazyConnectExchangeClient,这样我们的这个ReferenceCountExchangeClient其实仍然可以使用,只不过是在发送请求的时候才进行连接。

总结

这么看过来,我们基本可以看到connectionsshare connection的区别:

connections = 5则5个都是普通连接,关闭的时候全部关闭。而share connection的情况下,创建一个ReferenceCountExchangeClient,用以维护使用计数,只有在使用方都关闭的情况下才关闭,且关闭后可以继续使用(懒加载,使用这个client发送数据的时候才进行connect操作)。