Dubbo源码——集群策略

Dubbo源码——集群策略

Dubbo的集群策略一共分8种,分别是:

  • FAIL_OVER(Default)
  • FAIL_BACK
  • FAIL_SAFE
  • FAIL_FAST
  • BROADCAST
  • AVAILABLE
  • FORKING
  • MERGEABLE

今天来看一下这几种策略的实现。

FAIL_OVER

即失败重试(换一个提供者),我们看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// retry loop.
RpcException le = null; // last exception.
// invoked invokers.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
//检查当前cluster invoker是否已经被销毁
checkWhetherDestroyed();
//list是我们之前说过的Directory的list方法
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//负载均衡策略选取一个invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//用选取的这个执行
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("...");
}
return result;
} catch (RpcException e) {
//如果是业务异常,则不重试
if (e.isBiz()) { // biz exception.
throw e;
}
//否则进入到下一个循环中
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}

这里我们考虑一种异步的情况,异步的情况下,更建议使用Listenable Future的方式实现,这里给出一个demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Future invoke(args,tryCount,failoverFuture) {
if (tryCount > 0) {
future = invoke0(args);
future.setListener() {
success(result) {
failoverFuture.setSuccess(result);
return failoverFuture;
}

fail() {
return invoke(args,tryCount - 1,failoverFuture)
}
}
}
}

这样的实现方式在异步的情况下就非常优雅了。

FAIL_BACK

失败重试,看代码:

1
2
3
4
5
6
7
8
9
10
11
try {
//check
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("...");
addFailed(invocation, this);

return new RpcResult();
}

这里注意一下,如果出错就返回一个空的Result,而不是等待重试的结果。看一下这个addFailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// collect retry statistics
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("...", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
//ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed
failed.put(invocation, router);

如果这个invoker没有重试,则初始化一个重试,注意这里的重试并不会去换一个提供者,而是继续在这个提供者上重试。

看一下retryFailed方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
//重试
invoker.invoke(invocation);
//直接remove,下次不再重试,说白了就是只会重试一次
failed.remove(invocation);
} catch (Throwable e) {
logger.error("...", e);
}
}

可以看到就是循环这个map,执行所有失败了的invoke过程。

这里我们区分一下failover和failback:

  1. failover可以重试不止一次
  2. failover在失败时,进行重试,返回的是重试的结果。而failback返回的是一个空result
  3. failback不会更换一个提供者(invoker)进行重试

FAIL_SAFE、FAIL_FAST和AVAILABLE比较简单就跳过了

BROADCAST

广播,一次执行对所有的提供者都进行一次调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
//如果有一个调用失败则抛出异常
if (exception != null) {
throw exception;
}
return result;

其实也没有什么好看的,只需要注意一点即可:广播模式中,如果有一次调用失败,则返回失败。

FORKING

这个就比较有意思了,注释给出的解释是:用在一些实时性要求比较高的读操作。

我们看一下这个实现,分两块来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
//并发数量
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
//timeout 最快一个的timeout
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
//启动线程池去执行
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
//结果直接扔进blocking queue
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
//异常扔到队列里
ref.offer(e);
}
}
}
});
}

从上面的代码我们可以看出,首先,并发的去执行invoke,如果成功就把结果扔进队列,如果失败就把异常扔进队列。

继续往下看,下面的是有意思的地方:

1
2
3
4
5
6
7
8
9
10
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(...);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException(...);
}

这里直接从队列里poll一个元素出来,超时时间就是我们之前的timeout。如果是异常就抛出异常,如果是正常结果就返回,其他元素不作考虑。

之前我对这个地方有点疑惑,不太明白这样做的目的。在我的印象里应该是这样的:不断的poll队列中的元素(不超时的情况下),如果是异常就丢弃,如果是正常结果就返回。

如果像dubbo一样,可能会忽略很多成功的结果。今天重读的时候,看了一下注释才豁然开朗。dubbo的这种集群策略上写着:用于对实时性要求比较高的场景。这里我就理解了:优先速度,其次才是结果。对调用方来说,这种场景意味着对相应速度的依赖更高,那么不管最先返回的是成功的结果还是失败的结果,都直接返回给调用方。

MERGEABLE

把多个调用的结果组装成一个返回。这个依赖另外的一个Merger类,这里就不细说了。有兴趣的大家自己看一下源码吧。或者需求多的话,我再展开写一下。

联系方式

我的github