Dubbo源码解析——异步支持之Consumer端

Dubbo源码解析——异步支持之Consumer端

前言

之前版本的异步非常不优雅,需要各种配置、各种坑不说,获取Future的方式还是需要从Context里获取,这次的改动在原有的基础上,主要提供了更加便捷的异步使用方式。

目前dubbo已经支持Consumer端和Provider端异步。Provider端的异步是类似Servlet 3.0的异步方式做的。这篇文章就来解读一下dubbo的Consumer对异步的实现方式。

对异步的使用,可以参考一下文档:地址

Consumer

这里看个例子,非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 接口:
public interface DemoAsync {
CompletableFuture<String> sayHi();
}

// XML:
<dubbo:reference id="demoAsync" check="false" interface="org.apache.dubbo.demo.DemoAsync"/>

// 使用:
DemoAsync demoService = (DemoAsync) context.getBean("demoAsync"); // get remote service proxy
CompletableFuture<String> future = demoService.sayHi();
future.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(s);
}
});

这里要稍微往前追溯一点了,要追溯到dubbo形成代理的部分,这部分简单点看,入口在AbstractProxyFactory#getProxy里,会调用子类的getProxy(invoker, interfaces),默认实现是Javassist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

// InvokerInvocationHandler继承自InvocationHandler
InvokerInvocationHandler#invoke:

String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// sth...
RpcInvocation invocation;
// 判断某个方法是不是动态生成出来的
if (RpcUtils.hasGeneratedFuture(method)) {
Class<?> clazz = method.getDeclaringClass();
String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
invocation = new RpcInvocation(syncMethod, args);
// 设置标记为将来异步执行
invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
} else {
invocation = new RpcInvocation(method, args);
// 返回类型是不是CompletableFuture
if (RpcUtils.hasFutureReturnType(method)) {
//如果是,设置标记,将来用异步方式invoke
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
}
// 注意最后这个recreate
return invoker.invoke(invocation).recreate();

这里其实每个方法都是用Invoker进行执行,但是Invoker返回的都是Result,是通过Result.recreate()方法把Result中的值吐出来的,这点要记住,后面会提到。

真正执行Invoke默认还是在DubboInvoker里。看下doInvoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 是否是异步的,这里我们在创建代理的时候打上了标记,所以这里是true
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 这里是false,表示这个异步不是自动生成的
boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
//false,oneway表示调用之后不管结果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步情况下会走到这里,这个和同步一样都会返回一个ResponseFuture
ResponseFuture future = currentClient.request(inv, timeout);
// 注意这个Adapter,实际上是CompletableFuture子类,切实际上就是我们的返回值
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
// context里也设置一个future
RpcContext.getContext().setFuture(futureAdapter);

Result result;
if (isAsyncFuture) {
// 返回一个AsyncRpcResult,异步的Result
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}

这里我们看到,同步相对于异步,同步就是最后调用了一下ResponseFuture#get()等待结果,异步就是把ResponseFuture封装成了FutureAdapter,然后在外面再包上一层AsyncRpcResult后返回。

这里我们先不看后面,把目前为止的流程,结合例子捋一遍。我们rpc调用这次返回的本身就是一个CompletableFuture,并且给这个Future设置了回调,这个Future我们之前也强调过了,就是我们创建出来的FutureAdapter,这个FutureAdapter被包在AsyncRpcResult返回了。

问题有几个:

  1. invoke时候创建FutureAdapter是怎么被反馈给C端作为实际上的返回值的呢?
  2. FutureAdapter什么时候被调用的complete方法呢?

第一个问题其实已经有答案了,说回我们在一开始强调过了recreate方法,这个方法就是返回实际上的值,那么AsyncRpcResultrecreate方法是什么样的呢:return valueFuture;。这个valueFutureAsyncRpcResult的第一个入参,即doInvoke时候生成的futureAdapter。这样整个就打通了,流程就是:代理生成的过程中invoker.invoke(invocation).recreate(),这时候invoke返回的是AsyncRpcResult,然后recreate方法返回我们创建出来的FutureAdapter作为CompletableFuture使用。

第二个问题。我们要先了解一下ResponseFuture的回调机制了。默认实现是DefaultFuture。C端在收到P端返回的Result的时候,会调用HeaderExchangeHandler#received方法,处理之后调用DefaultFuture.received(channel, response);,从response中拿到调用id之后,获取对应的DefaultFuture,然后执行DefaultFuture#doReceived

1
2
3
4
5
6
7
8
9
10
11
12
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}

就是把response赋值以后,如果有回调需要调用则调用回调。

我们猜测一下,C端异步就是通过这个callback最终回调到用户代码里的那个CompletableFuture(实际上返回的是futureAdapter)的。

这里我们要看一下FutureAdapter的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

private final ResponseFuture future;

public FutureAdapter(ResponseFuture future) {
// 这里这个future就是DefaultFuture
this.future = future;
// 这个resultFuture先不管,是给P端用的
this.resultFuture = new CompletableFuture<>();

// 给future增加回调
future.setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
Result result = (Result) response;
FutureAdapter.this.resultFuture.complete(result);
V value = null;
try {
// 这个result是服务端返回的Result,result.recreate返回的是P端返回的字符串“hi”
value = (V) result.recreate();
} catch (Throwable t) {
FutureAdapter.this.completeExceptionally(t);
}
// 调用自己的complete,这里这个value就是P返回的字符串“hi”
FutureAdapter.this.complete(value);
}

@Override
public void caught(Throwable exception) {
FutureAdapter.this.completeExceptionally(exception);
}
});
}

谜团解开。在DubboInvokerdoInvoke中,我们创建了一个FutureAdapter,这个实例将来通过AsyncRpcResult#recreate返回给用户。这个FutureAdapter中维护了一个DefaultFuture。这个DefaultFuture在接受到P端的返回值的时候,会执行一个回调,回调的时候,会执行维护了这个DefaultFutureFutureAdaptercomplete把远端的值返回回来。这样,用户在代码中设置了的whenComplete就会被触发并且执行了。

细节

各位看过来了!这里有个细节。今天也是请教了一下阿里的陆龟大神才明白(不得不说在社区里请教各位大神就是方便)。

我们看下doInvoke中的一段代码:

1
2
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);

第一句执行request(实际上就把请求发出去了,但是是异步的)。第二句就是之前说的,初始化FutureAdapter将来返回给用户,初始化的过程中我们也说过了,会设置callback,将来调用这个FutureAdaptercomplete

问题来了,要是currentClient.request发送出去的数据已经返回,但是FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);还没执行,怎么办?没执行这句,callback就不会被设置到ResponseFuture上,自然也不会执行FutureAdaptercomplete了。这样是不是FutureAdaptercomplete永远不会被执行?

要么说自己还是经验不够。这种地方其实之前梁飞大大就已经考虑到了。这里取咨询了一下阿里的陆龟,我们看下ResponseFuture#setCallback方法就知道答案了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (isDone()) {
// 如果Future已经完成了,就直接执行callback
invokeCallback(callback);
} else {
boolean isdone = false;
lock.lock();
try {
// 加锁判断是否执行完成
if (!isDone()) {
this.callback = callback;
} else {
isdone = true;
}
} finally {
lock.unlock();
}
// 如果在设置callback期间执行完了,那么也直接执行callback
if (isdone) {
invokeCallback(callback);
}
}

发现了么,如果设置的时候发现Future已经结束了,就直接执行一次callback,否则就加锁设置回调,如果加锁之后,Future已经结束,同样最终直接执行回调。

这种设计非常值得借鉴。宗旨就是,不管设置回调是在Future结束前还是结束后,回调都应该被执行。

至此,Consumer端异步就已经结束了。可以期待下一篇的Provider端的异步实现了。需要提醒大家的是,一定要多多debug代码,我的文章也只是给大家一个看代码的思路,真正理解和融会贯通,还要不断地debug源码才行。