Dubbo源码解析——异步支持之Provider端

Dubbo源码解析——异步支持之Provider端

前言

继上篇Consumer端的异步,我们再看看Provider端如何异步。先看个例子:

1
2
3
<bean id="asyncService" class="com.alibaba.dubbo.samples.async.impl.AsyncServiceImpl"/>

<dubbo:service async="true" interface="com.alibaba.dubbo.samples.async.api.AsyncService" ref="asyncService"/>

注意这里async=true表明这是一个开启P端异步的服务。P端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AsyncServiceImpl implements AsyncService {
public String sayHello(String name) {
final AsyncContext asyncContext = RpcContext.startAsync();
new Thread(() -> {
// 切换线程上下文
asyncContext.signalContextSwitch();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
asyncContext.write("Hello " + name + ", response from provider.");
}).start();
// 这里return什么都无所谓
return null;
}
}

这里的用法,如果使用过Servlet 3.0的同学应该会比较熟悉,用法很类似。注意这里我们在P端新开了一个线程,如果直接用RpcContext.getContext是拿不到原来的上下文的,需要asyncContext.signalContextSwitch一下,把调用的上下文切换到新线程里。关于最后的return,其实这里return什么都无所谓的,因为最终内容会使用asyncContext.write写出去的内容。

代码

看完了使用我们分析一下P端异步的实现方式。我们在XML中给P的Service打了一个async=true,这个表示可以开启异步。

我们看下DubboProtocol中的内部类,其中有一个requestHandler,这个类是真正处理之后把值返回的地方,看下这个类的reply方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;

// 拿到对应的Invoker
Invoker<?> invoker = getInvoker(channel, inv);

// do sth...

// rpc context
RpcContext rpcContext = RpcContext.getContext();
// 是否支持异步,这里判断的依据就是我们的xml中,这个服务是否有 async = true 标识
boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
if (supportServerAsync) {
// 如果支持异步,在RpcContext设置一个AsyncContextImpl
CompletableFuture<Object> future = new CompletableFuture<>();
rpcContext.setAsyncContext(new AsyncContextImpl(future));
}
rpcContext.setRemoteAddress(channel.getRemoteAddress());

// 真正的invoke过程,因为是异步的,所以会迅速返回
Result result = invoker.invoke(inv);

// 因为是异步的,这里返回的是AsyncRpcResult
if (result instanceof AsyncRpcResult) {
// 这里getResultFuture返回的是CompletableFuture<Result>,而要求是CompletableFuture<Object>,这里只是做了一个转化,实际上还是返回了一个CompletableFuture<Result>
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}

这里我们要看下AbstractProxyInvoker#invoke方法了,P端的ServiceImpl其实也是代理,是AbstractProxyInvoker的实现类,里面有这样一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
RpcContext rpcContext = RpcContext.getContext();
// doInvoke实际上就是执行某个方法
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());

if (RpcUtils.isFutureReturnType(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) {
// 我们这种情况会进入这里,当我们在P端执行RpcContext.startAsync()时,表示开启异步
return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
} else {
return new RpcResult(obj);
}

这里我们看到,如果开启异步,真正的执行的结果会是一个AsyncRpcResult,这个AsyncRpcResult包含了一个Future,我们回到reply方法中,可以看到,这个Future是一个CompletableFuture<Object> future = new CompletableFuture<>();

那好了,总结上面的几步,就是开启异步的时候,我们会在AsyncContextImpl里面初始化一个CompletableFuture,真正执行doInvoke之后,会把这个CompletableFuture包裹在AsyncRpcResult中返回。

看下AsyncRpcResult的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 在我们开启服务端异步的情况下,三个入参的情况:
// 入参第一个参数,就是AsyncContextImpl中的那个Future
// 第二个rFuture是new出来的
// 第三个参数为true
public AsyncRpcResult(CompletableFuture<Object> future, CompletableFuture<Result> rFuture, boolean registerCallback) {
if (rFuture == null) {
throw new IllegalArgumentException("");
}
resultFuture = rFuture;
if (registerCallback) {
future.whenComplete((v, t) -> {
RpcResult rpcResult;
if (t != null) {
if (t instanceof CompletionException) {
rpcResult = new RpcResult(t.getCause());
} else {
rpcResult = new RpcResult(t);
}
} else {
rpcResult = new RpcResult(v);
}
rFuture.complete(rpcResult);
});
}
this.valueFuture = future;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}

解释一下,入参第一个Future,就是我们AsyncContextImpl中的那个Future,在我们的P端执行asyncContext.write时,就会触发它的whenComplete回调。这里其实就是把我们P端服务实现的值传递给一个rFuture

这里其实就是把我们的AsyncContextImpl中的Future(又可以称为值Future表示维护的是一个值,即服务端执行结果,比如一个String)和resultFuture(可以称为ResultFuture,即Dubbo一次Rpc执行的Result,这里是一个RpcResult)关联起来,具体的联系就是:当值Future执行完成时(P端调用asyncContext.write时),调用ResultFuturecomplete,并且把write进去的值包装城RpcResult传递给ResultFuture

这样我们的关系就理顺了,这里其实还是有点绕的,原因还是由于Dubbo自身的设计问题(写出去的都必须是Result)。P端异步的流程总结一下就是:

  1. P端开启异步,这里会设置一个值FutureAsyncContextImpl里,用来监听P端执行的结果。
  2. 这个值Future同样被设置到一个新创建的AsyncRpcResult中,并且同时在AsyncRpcResult中初始化了一个ResultFuture
  3. 值Future设置回调,当值Future完成的时候,就把这个值包装成一个RpcResult,并且执行ResultFuturecomplete方法,把ResultFuture给结束掉。

顺着我们的思路,那是不是说ResultFuture应该有一个回调(whenComplete函数),当ResultFuture结束的时候,就把Result写回到C端呢?

继续看回到dubbo执行完成之后的部分,及reply返回给上级之后干了什么,这里看下HeaderExchangeHandler#handleRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 这个future是AsyncRpcResult中的ResultFuture
CompletableFuture<Object> future = handler.reply(channel, msg);
// 如果这个future已经结束了,就直接写回Consumer
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
// 如果还没结束,就给这个ResultFuture设置一个回调,写回Consumer
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
} finally {
}
});

果然就像我们说的那样,我们handler.reply(channel, msg)会返回一个CompletableFuture,这个CompletableFuture结束时就把内容写回C端。

而这个CompletableFuture,我们看回到AsyncRpcResult中,就是入参的第二个参数,是被new出来的CompletableFuture。这个CompletableFuture是在AsyncRpcResult构造函数里的第一个入参(值Future)执行whenComplete回调时,被complete的。

而第一个入参是我们AsyncContextImpl中的那个Future,二者是同一个引用。这个AsyncContextImpl就是我们在P端实现类里执行asyncContext.write时被complete的,这样,我们就从正反两个方向明白了整个异步的执行过程。