Dubbo源码解析——异步支持之异步带来的问题

Dubbo源码解析——异步支持之异步带来的问题

面临的问题

承接我之前的两篇文章,异步虽好,但是也为框架带来了新的复杂性。第一个问题是Filter问题,第二个问题是上下文问题。第二个问题其实比较容易理解,我们在Invoke的时候,有一个RpcContext,我们以P端异步为例,这时候Invoke里开了一个新线程去执行真正的业务逻辑,这时候我们就无法从新线程里获取RpcContext了(ThreadLocal不一样了)。

第一个问题比第二个复杂一点,解决方式也复杂一点点。我举个例子,P端有一些Filter,比如ExceptionFilterFilterinvoke逻辑是这样的:

1
2
3
4
5
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// before invoke...
Result r = invoker.invoke(invocation);
// after invoke... Example: deal with 'r'
}

可以在执行前和后分别做一些事情,跟Spring的AOP有点像。ExceptionFilter的作用记录invoke结束之后的Result中的异常。问题来了:如果是P端异步,invoker.invoke(invocation)这步会立刻返回一个AsyncRpcResult(为什么会返回这个类型的Result在P端异步支持中说了,这里不细说),但是这个AsyncRpcResult中是没有结果的,因为可能业务还没执行结束,那么这个时候很明显也没有异常需要记录,如果这时候过滤器就执行结束,那么很明显,有些功能就缺失了。

Filter失效问题代码分析

我们先看第一种,过滤器失效问题。

这里Dubbo引入了一种新的接口:PostProcessFilter,里面也只有一个方法:Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation);

实现这个接口的类有一个AbstractPostProcessFilter,主要的逻辑就是在这里实现的,我们看下这个类的postProcessResult方法:

1
2
3
4
5
6
7
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> doPostProcess(r, invoker, invocation));
return asyncResult;
} else {
return doPostProcess(result, invoker, invocation);
}

如果结果是同步的,就直接执行doPostProcess,如果是异步的,就给asyncResult增加一个回调(asyncResult.thenApplyWithContext),回调中执行doPostProcess。继续看下这个asyncResult.thenApplyWithContext

1
2
3
public void thenApplyWithContext(Function<Result, Result> fn) {
this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
}

这里这里把resultFuture也就是ResultFuture替换成了新的Future,并且增加了回调,当resultFuture结束的时候,会执行我们传入的Function(传入的Function主要是执行doPostProcess方法)和两个上下文切换的Function

上下文切换的Function我们一会再说。这里其实主要目的就是一个:在AsyncRpcResult结束的时候,执行doPostProcess方法。

我们再看看ExceptionFilterinvoke

1
2
3
4
5
6
7
8
// 这里跟源码有出入,为了看着更清晰。源码是把两句改成了一句,及没有局部变量r
try {
Result r = invoker.invoke(invocation);
return postProcessResult(r, invoker, invocation);
} catch (RuntimeException e) {
// logger...
throw e;
}

这里就是把invoke的结果传递给postProcessResult,而根据我们刚说的,postProcessResult就是给AsyncRpcResult增加一个回调,让这个异步Result结束以后调用doPostProcess。那么我们可以看一下ExceptionFilterdoPostProcess方法,其实里面就是执行了记录日志的逻辑。

至此我们就明白,Filter失效问题的解决方案,给AsyncRpcResult增加回调(最终这个回调会被加到我们上一篇说的ResultFuture方法上),在回调中执行doPostProcess方法,doPostProcess方法就是我们希望在执行结束后做的事情,这里的执行结束是指的真正的业务逻辑结束。

既然原理都清楚了,那么大家就清楚了:如果要扩展一个Dubbo的过滤器,还要能够处理异步的结果该怎么办了吧——继承一下AbstractPostProcessFilter即可(参考ExceptionFilter

RpcContext失效问题代码分析

再看RpcContext。Dubbo调用其实是Dubbo线程,用户在P端开启异步实际上是用户线程,我们用D线程代表Dubbo线程U线程代表用户线程

这里其实解决思路比较简单,D线程里的RpcContext,我们取出来放到U线程里就可以解决这个问题了。

看看Dubbo的解决方案,先看AsyncContextImpl的构造函数,这个AsyncContextImpl我们在P端异步的时候说过,如果xml里配置<dubbo:service async=true>,C端调用这个服务的时候,就会提前在RpcContext里初始化一个AsyncContextImpl

1
2
3
4
5
public AsyncContextImpl(CompletableFuture<Object> future) {
this.future = future;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}

除了我们上篇说的设置Future,还初始化了两个RpcContext。注意,AsyncContextImpl是在D线程里被初始化的,这时候RpcContext还是生效的!

然后我们在使用的时候,可以在新线程里执行AsyncContext#signalContextSwitch方法,切换上下文:

1
2
3
4
5
6
7
8
public void signalContextSwitch() {
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
}

public static void restoreContext(RpcContext oldContext) {
LOCAL.set(oldContext);
}

这里storedContext就是D线程中,那个有各种值的Context,在U线程中执行,就会把当前线程的Context设置成D线程中的那个。这样,我们在新线程里就可以愉快的使用RpcContext了。

同样可以在AsyncRpcResult中发现类似的操作,这里就不展开说了,大家可以看看,其实是一样的套路,只不过改成用回调的方式设置,具体可以看看AsyncRpcResult#thenApplyWithContext方法。

后记

这个后记我想了半天还是决定写一写,这个部分比较复杂,我也不知道自己能不能说明白。

我们先看回AsyncRpcResult的构造方法,里面有这么一句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (registerCallback) {
future.whenComplete((v, t) -> {
RpcResult rpcResult;
if (t != null) {
if (t instanceof CompletionException) {
rpcResult = new RpcResult(t.getCause());
} else {
rpcResult = new RpcResult(t);
}
} else {
rpcResult = new RpcResult(v);
}
// instead of resultFuture we must use rFuture here, resultFuture may being changed before complete when building filter chain, but rFuture was guaranteed never changed by closure.
rFuture.complete(rpcResult);
});
}

可以注意到我留了一句注释在那里,这句的注释意思是,我们必须使用rFuture.complete,而不能使用resultFuture,因为在构建filter链的时候,resultFuture会变化(变成其他实例),但是由于rFuture由于有闭包的存在,所以不会变化。

在哪里变化的呢,在这里AsyncRpcResult#thenApplyWithContext。这个方法我们之前说过了,是用来注册filter的回调用的。这里有几个问题:

  1. 为什么resultFuture会变化?
  2. 就算resultFuture变化,又会有什么问题?

这里我们可以做个试验,我这里就展示代码了,completableFuture.thenApply会返回一个新的completableFuture,这样我们在this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));就会导致resultFuture不断变为新的实例。

第一个问题解决了,再看第二个。如果resultFuture变化,使用resultFuture.complete会有什么问题。答案是会导致回调失败。假设resultFuture注册了个Filter的回调:F1

还没开始注册的时候,resultFuture = rFuture,注册了第一个F1resultFuture就不再等于rFuture了,而是(F1-Future(rFuture))F1-Future表示注册F1返回的新的future),这里举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
// 这里是初始化的cf1,我们可以称其为 old-cf1
CompletableFuture<String> cf1 = new CompletableFuture<>();
// 这里可以看成是this.resultFuture = resultFuture.thenApply...,这里cf1被覆盖,变成new-cf1
cf1 = cf1.thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println("then apply");
return s;
}
});
// 这时候的cf1和上面new出来的cf1已经不是同一个了
// 这里执行cf1执行complete,也不会打印then apply,因为thenapply注册在old-cf1上了
cf1.complete("abc");
}

也就是说,我们的回调不在起作用。那么怎么才能让它起作用呢?很简单,用老的cf1去触发回调:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
CompletableFuture<String> cf1 = new CompletableFuture<>();
// 这里不要覆盖cf1即可
cf1.thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println("then apply");
return s;
}
});
cf1.complete("abc");
}

看到了吧,我们的resultFuture在变化,如果不适用rFuture而使用resultFuture,就会导致我们的结果complete之后,其他Filter注册的回调也不生效。

那为什么rFuture就可以呢,因为rFuture在方法结束以后就不会变化了,(F1-Future(rFuture))这种模式下,我们可以认为必须从内部往外传递才能出发Filter注册的回调,这时候使用rFuture就可以触发所有的回调了。

那么又引出一个新问题,为啥thenApplyWithContext要这么写:this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));,我们直接resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));这么写是不是问题就解决了呢?我们依然可以触发所有Filter注册的回调,而且还可以使用resultFuture.complete,而不用关注rFutureresultFuture谁变了谁没变,多好。

答案是不行。

这里涉及到Dubbo的FilterOrder问题。什么意思,如果使用resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));,假设我注册了三个FilterF1F2F3,那么会变成当resultFuture完成的时候,直接调用F1F2F3,而且是并行的。

如果使用this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));,会变成先执行F1的回调,当F1注册的回调结束之后再执行F2,再F3

区别一看就知道了,很明显我们应该让F1F2F3按照顺序执行,因为有些过滤器的执行优先级是非常高的而有些很低,如果我们并行执行很明显就打乱了回调的顺序。

我纠结这个问题纠结了大概有三四个周了,最近才完全搞明白。希望这里写的不会太乱。

再后记

这里吐个槽,我们试图改动框架代码的时候,至少要先尽可能理解框架为什么这么做。最近成为Committer之后,理所当然的解决各种issue,有很多人完全不理解为什么Dubbo的代码为啥要这么写,就直接吐槽,什么我觉得你们不严谨,我觉得你们这个代码不能这么写,我觉得这个那个。然后我一看他的github,还是个初学者,非常固执,根本听不进去意见。

这种都是无意义的issue,我并不是一竿子掀翻一船人,说什么初学者就不行,不要发表自己的意见。但是至少要充分评估自己的问题的正确性,凡事需谨慎,没实力的时候不要那么横。