你好,我是何辉。今天我们深入研究Dubbo源码的第九篇,订阅流程。
上一讲,我们通过一个简单的 @DubboService 注解,挖出了服务发布的内幕,找到了 ServiceBean 的 Bean 定义、ServiceConfig 的导出关键节点,发现了本地导出和远程导出,在远程导出的过程中还顺便进行了服务注册。可以说,发布流程为提供方做足了提供服务的准备。
但是,消费方,向提供方发起调用时,并没有设置需要调用提供方的哪个地址,却能神不知鬼不觉地调通提供方,并拿到结果。是不是很神奇。那消费方到底是怎么知道提供方的地址呢?
在“[温故知新]”中,我们学过消费方如何发起调用,可以用 <dubbo:reference/> 标签引用提供方服务来发起调用,或者换成 @DubboReference 注解也可以。不管使用标签,还是注解,我们都是在想办法拿到调用提供方的一个引用句柄而已。
所以,我们也可以逆向排查 @DubboReference 注解,来进一步探索今天的问题。
对比复习 提到通过 @DubboReference,早在之前的“泛化调用”中,我们就逆向查找 @DubboReference 注解,找到了 ReferenceConfig 这个核心类,通过调用 ReferenceConfig 的 get 方法,拿到可以向下游发起调用的泛化对象。
参考上一讲,我们可以尝试着将 @DubboReference、@DubboService 两个注解延伸出来的知识点比对总结一下。
大体形式已经很清晰了,重点看三方面的对比。
作用域,@DubboService 是作用于提供方的,而 @DubboReference 是作用于消费方的; 类的名称,@DubboService 会落到 ServiceConfig 中进行导出,而 @DubboReference 会落到 ReferenceConfig 中进行引用; 辐射功能,ServiceConfig 中涵盖了本地导出和远程导出两个重要的分支逻辑,但是 ReferenceConfig 还暂时不清楚。 有没有发现,这些小知识点你都学过。学知识,越比较,越能发现问题,然后在逐步解答问题的过程中,你会发现各个知识点已经融入了你的知识体系中。
说回来,这里通过简单的对比,想必你心中有了继续深挖的方向了。没错,就是 ReferenceConfig 的 get 方法,我们深入看看会不会像 ServiceConfig 一样发现新大陆。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Override public T get () { if (ref == null ) { getScopeModel().getDeployer().start(); synchronized (this ) { if (ref == null ) { init(); } } } return ref; } ↓ protected synchronized void init () { ref = createProxy(referenceParameters); } ↓ private T createProxy (Map<String, String> referenceParameters) { if (shouldJvmRefer(referenceParameters)) { createInvokerForLocal(referenceParameters); } else { urls.clear(); if (StringUtils.isNotEmpty(url)) { parseUrl(referenceParameters); } else { if (!"injvm" .equalsIgnoreCase(getProtocol())) { aggregateUrlFromRegistry(referenceParameters); } } createInvokerForRemote(); } return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
果然从 ReferenceConfig 的 get 方法中,发现了一些非常重要的逻辑。
ReferenceConfig 的 get 方法中,以线程安全的方式来创建 ref 引用对象;ReferenceConfig 的 init 方法中,除了构建引用服务所需的参数,还有个创建代理(createProxy)的方法,并且将创建代理方法的返回值赋值给了 ref 对象。
创建代理的内部逻辑中,有条件地进行了本地引用(createInvokerForLocal)和远程引用(createInvokerForRemote),并将引用之后的结果 invoker 对象再次包装为代理对象,以供消费方调用远程使用。
通过对比翻阅代码,确实是一种不错的方式,我们可以稍微完善一下对比图了。
既然有了本地引用和远程引用这两个重大发现,想必你已经迫不及待了,那么接下来我们就挨个分析。
1. createInvokerForLocal 本地引用 先看本地引用的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void createInvokerForLocal (Map<String, String> referenceParameters) { URL url = new ServiceConfigURL ("injvm" , "127.0.0.1" , 0 , interfaceClass.getName(), referenceParameters); url = url.setScopeModel(getScopeModel()); url = url.setServiceModel(consumerModel); Invoker<?> withFilter = protocolSPI.refer(interfaceClass, url); List<Invoker<?>> invokers = new ArrayList <>(); invokers.add(withFilter); invoker = Cluster.getCluster(url.getScopeModel(), "failover" ).join(new StaticDirectory (url, invokers), true ); }
整体流程非常简单,一上来创建一个 injvm 协议的 url 对象,紧接着调用了 protocolSPI 的 refer 方法,返回了一个含有过滤器拦截特性的 invoker 对象,最后被集群扩展器再次聚合成了单个 invoker 对象。
然而细心的你可能已经发现了,protocolSPI 这个变量在上一讲“[发布流程]”的导出中也见过,难道 refer 和 export 方法有内在联系?
为了验证这个问题,你再次进入 Protocol 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @SPI(value = "dubbo", scope = ExtensionScope.FRAMEWORK) public interface Protocol { @Adaptive <T> Exporter<T> export (Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException; }
可以看到,原来 export 和 refer 是成对存在的。
有了 export 的分析经验,再分析 refer 就是小菜一碟了。refer 方法上也是有 @Adaptive 注解的,那可以借鉴上一讲分析对象会经历哪些调用流程的经验,画出这样的调用链。
先经过了一系列的包装类,然后进入到 InjvmProtocol 实现类中。和导出时通过 injvm 走的流程,其实是一样的,再次证明了 export 和 refer 是成对存在的。
既然是成对存在的,injvm 的导出,是将创建出来的 invoker 对象放进 InjvmProtocol 中的,难道 refer 会返回一个 invoker 么?这也只是推测,我们不妨进入 InjvmProtocol 中验证看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { return protocolBindingRefer(type, url); } ↓ @Override public <T> Invoker<T> protocolBindingRefer (Class<T> serviceType, URL url) throws RpcException { return new InjvmInvoker <T>(serviceType, url, url.getServiceKey(), exporterMap); }
通过深入 InjvmProtocol 的源码可以得知,推测是正确的,所谓的本地引用,就是直接在本地创建了一个 InjvmInvoker 对象而已,并且源码再次向我们证明了 export 和 refer 是成对存在的。
2. createInvokerForRemote 远程引用 分析了本地引用,我们继续看远程引用。一定要注意,成对存在的概念,对比很重要,大概率就能推测出底层的核心逻辑 。所以我们先结合提供方的操作,对比猜测一下消费方会做哪些处理。
提供方在远程导出时,利用 Netty 绑定了协议端口来提供服务,对应绑定端口的核心类是 NettyServer,那消费方引用时,会不会尝试连接 Netty 服务呢?连接 Netty 服务的类会不会是 NettyClient 呢?
提供方最后在远程导出时,顺便将服务接口信息写到了注册中心,那么在消费方的远程引用时,会不会也往注册中心写数据呢?又或者会不会从注册中心获取数据呢?
带着推测,我们继续深入远程引用的代码中去看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 private void createInvokerForRemote () { if (urls.size() == 1 ) { URL curUrl = urls.get(0 ); invoker = protocolSPI.refer(interfaceClass, curUrl); if (!UrlUtils.isRegistry(curUrl)) { List<Invoker<?>> invokers = new ArrayList <>(); invokers.add(invoker); invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory (curUrl, invokers), true ); } } else { List<Invoker<?>> invokers = new ArrayList <>(); URL registryUrl = null ; for (URL url : urls) { invokers.add(protocolSPI.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { registryUrl = url; } } if (registryUrl != null ) { String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME); invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false ).join(new StaticDirectory (registryUrl, invokers), false ); } else { if (CollectionUtils.isEmpty(invokers)) { throw new IllegalArgumentException ("invokers == null" ); } URL curUrl = invokers.get(0 ).getUrl(); String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT); invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory (curUrl, invokers), true ); } } } ↓ public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY); String group = qs.get(GROUP_KEY); if (StringUtils.isNotEmpty(group)) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*" .equals(group)) { return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs); } } Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY)); return doRefer(cluster, registry, type, url, qs); }
可以总结出 3 点。
远程引用的 urls 集合,既可以是注册中心地址,也可以多个提供方的点对点地址。 不管 urls 集合的数量是一个还是多个,最终都是循环调用 refer 方法,然后累加 refer 的所有结果,最终被集群扩展器包装成了一个 invoker。 refer 的方法体中将注册器、集群扩展器、接口等信息全部封装到了一个 doRefer 方法中。 第三点,所有抽象的对象都传入了 doRefer 方法中,由此可见,这个方法应该就是我们研究的重中之重。我也给你总结了代码的调用流程图。
我们得出了一个非常重要的结论,从 RegistryProtocol 的 doRefer 一路跟踪,最终又回到了 RegistryProtocol 的 doCreateInvoker 方法,而且从方法名也能看出是创建 invoker 对象的核心逻辑,因此远程引用的核心逻辑就落到了 doCreateInvoker 方法中。
快要见到曙光了,我们看 doCreateInvoker 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 protected <T> ClusterInvoker<T> doCreateInvoker (DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) { directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap <>(directory.getConsumerUrl().getParameters()); URL urlToRegistry = new ServiceConfigURL ( parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY), parameters.remove(REGISTER_IP_KEY), 0 , getPath(parameters, type), parameters ); urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel()); urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel()); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(urlToRegistry); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(urlToRegistry); directory.subscribe(toSubscribeUrl(urlToRegistry)); return (ClusterInvoker<T>) cluster.join(directory, true ); }
果然还是熟悉的套路,在 doCreateInvoker 方法中主要做了两件事情:向注册中心注册了消费接口的信息、向注册中心发起了订阅及监听。
我们又可以完善这张对比图,找到了远程引用做的两件重要事情。
到此,我们回过头来看看之前的推测。
推测一: 提供方在远程导出时,利用 Netty 绑定了协议端口来提供服务,对应绑定端口的核心类是 NettyServer,那消费方引用时,会不会尝试连接 Netty 服务呢?连接 Netty 服务的类会不会是 NettyClient 呢? 推测二: 提供方最后在远程导出时,顺便将服务接口信息写到了注册中心,那么在消费方的远程引用时,会不会也往注册中心写数据呢?又或者会不会从注册中心获取数据呢?
我们发现只有推测二被证明了,消费者在远程引用时,确实会把自己需要消费哪个接口也写到注册中心,这样一来可以反向说明,通过提供方的一个接口,可以从注册中心找到有哪些提供方服务节点,还能找到有哪些消费方来使用这个接口。
推测一呢?别着急,我们还有一个 subscribe 没看,这个订阅方法看起来只有一行简短的代码,里面的逻辑深度却不亚于 doRefer 方法。
但是也别担心,这里我教你一个以终为始的反向验证小技巧,既然我们推测 NettyClient 会是最终结果,不如将计就计,找找有没有类名中含有 Netty 关键字且是操作 Netty 的客户端类。
如果找到了,看有没有尝试连接服务端的方法,打个断点。 如果没有,就看父类有没有连接服务端的方法。 如果都没有,就直接进入 Netty 的 Bootstrap 类继续找连接服务端的方法。 总之,我们的目的就是要找到通信的出口,落实到代码中,就是要找到关于 Netty 发出连接服务端请求的相关 API。
按照小技巧,我们输入了 Netty 关键字,看看有没有操作 Netty 的客户端类,又或者去 Netty 的 Bootstrap 类中看看有没有 connect 连接之类的方法,检索的结果展示如下:
两种检索方式都找到了对应的结果,因为不管是哪种方式最后都会证明推测一是正确的,因此,我这里就使用 NettyClient 这个类,找个连接的方法。
结果发现 NettyClient 中有个 doConnect 方法,见名知意,这大概率就是连接服务端的核心方法,于是我们就在消费方 NettyClient 这里打个断点,先启动提供方,再 Debug 启动消费方,静静等候断点的到来:
这就是我们当前断点到来时呈现的调用堆栈,从下往上看。
首先是熟悉的方法名,为远程引用创建 invoker 的方法入口 createInvokerForRemote。 RegistryProtocol 中创建 invoker 的核心方法 doCreateInvoker。 还未来得及进入源码一探究竟的 subscribe 接口订阅方法。 接口订阅之后来了一堆的 notify 的通知方法,并且又再次走了一遍 refer 方法,很是奇怪。 最后是NettyClient 的 doConnect 方法。 到这里,我们通过 NettyServer 推测出了 NettyClient 是成立的,推测一完全正确。
订阅 你可能会说,还没完,我们刚刚的调用堆栈图中不是还有两个问号么?
这个好说,毕竟断点停在这里,我们可以直接找到第一个 notify 方法被调用的地方,也就是 doSubscribe 方法中调用了 notify 方法,进入源码看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Override public void doSubscribe (final URL url, final NotifyListener listener) { try { checkDestroyed(); if ("*" .equals(url.getServiceInterface())) { } else { CountDownLatch latch = new CountDownLatch (1 ); try { List<URL> urls = new ArrayList <>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap <>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl (url, path, k, latch)); if (zkListener instanceof RegistryChildListenerImpl) { ((RegistryChildListenerImpl) zkListener).setLatch(latch); } zkClient.create(path, false ); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null ) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } finally { latch.countDown(); } } } catch (Throwable e) { throw new RpcException ("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
订阅的核心逻辑,总结下来就是 3 点。
第一点,根据订阅的 url 信息,解析出 category 类别集合,类别除了有看到的 providers、configurators、routers,还有一个 consumers,但是因为我们目前是消费方进行订阅,解析出来的类别集合没有 consumers 也是合乎情理的。 第二点,循环为刚刚解析出来的 category 类别路径添加监听,并同时获取到类别目录下所有的信息,也就是说这里与注册中心有了通信层面的交互,拿到了注册中心关于订阅接口的所有信息。 第三点,将获取到的所有信息组装成为 urls 集合,统一调用 notify 方法刷新。 根据这 3 点结论,似乎解答了我们开篇的问题,为什么消费方能感知到提供方的地址呢?其实就是在这个环节,当消费方向注册中心发起接口订阅的时候,就已经拿到了该接口在注册中心的提供方信息、配置信息、路由信息。
拿到该接口的所有信息后,我们再细看 notify 后面调用的方法名。
结果看到了一个 toInvokers 的方法,将刚刚所有的 urls 转成对应 invoker 对象。
既然转成 invoker 对象后,就自然变成了当初进入 refer 的逻辑了,因为 refer 的目的就是根据 url 得到 invoker 对象,现在整个逻辑就理顺了,我们再补充一下图。
订阅流程的推拉案例 在今天的订阅流程中,消费方第一次启动时,会主动去注册中心拉取最新的信息,这也是我们常见的推拉模型中的 pull 模型,其实还有另外一种 push 模型,推拉模型在我们日常开发中还是挺常见的,那日常有哪些常用的框架有着这样的发布订阅或推拉模式呢?
Redis 的发布订阅,生产者将消息发布到某个频道,订阅这个频道的消费方都会收到消息。 Kafka 的消费轮询,生产者将消息发送到 Broker 中后,消费方会采取长轮询的 pull 拉取模式来自主控制消费速率。 ZooKeeper 的事件通知,消费方会订阅监听 ZooKeeper 服务的文件目录,一旦有变更,ZooKeeper 服务会基于长连接的方式,通知监听该目录的消费方,而通知的内容甚少,若消费方需要知道更多信息的话,那由消费方自主控制去 ZooKeeper 服务端拉取最新信息。 不同的框架采用不同的方式,也有自己的利弊权衡。
push 模式的优点是实时性强,客户端只要简单的被动接收即可。但是也容易导致消息积压,同时也加大了服务端的逻辑复杂度。
pull 模式的优点是主动权掌握在客户端自己手中,消费多少就取多少,长轮询的操作也顶多就是耗费消费方一些线程资源和网络带宽,但是,轮询间隔也得在实时性能容忍的情况下,且不会对服务端造成太大请求压力冲击。这样,客户端的逻辑就会更加复杂,反而会使得服务端简单干脆。
总结 今天,我们抛出消费方是怎么知道提供方地址的问题,对比订阅流程与发布流程、 @DubboService 与 @DubboReference、ServiceConfig 与 ReferenceConfig、本地导出与本地引用、远程导出与远程引用。通过比对,分析未知的流程,是一件很值得推敲和验证的事情。
这里我用 4 个步骤总结下今天学的订阅流程。
首先,通过 @DubboReference 注解跟踪源码的使用地方,找到了平常进行远程调用的核心类 ReferenceConfig。 紧接着,在ReferenceConfig 的 get 方法中见识到了本地引用与远程引用的主干流程。 然后,深入远程引用的分支逻辑,找到了最核心的创建远程 invoker 的核心逻辑 doCreateInvoker。 最后,在这段 doCreateInvoker 逻辑中,发现了消费者注册和接口订阅逻辑,在接口订阅中见识到了感知提供方的地址原来如此简单。 订阅流程的推拉案例,有Redis 的发布订阅、Kafka 的消费轮询、ZooKeeper 的事件通知。
思考题 留个作业给你,在接口订阅的逻辑中,我们挖出消费方是如何一次性获取提供方的所有地址列表的,但消费方在接口订阅方法中只是添加了对 ZooKeeper 目录的监听,那对应接收 ZooKeeper 服务端事件变更,在代码哪个位置呢?
期待看到你的回答,如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起讨论。我们下一讲见。
19 思考题参考 上一期留了个作业,研究下 ProtocolFilterWrapper 协议过滤器包装类,对 export 方法的拦截做了哪些事情。
想要解答这个问题,其实也不是很难,我们直接进入到该类去看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 @Activate(order = 100) public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper (Protocol protocol) { if (protocol == null ) { throw new IllegalArgumentException ("protocol == null" ); } this .protocol = protocol; } @Override public int getDefaultPort () { return protocol.getDefaultPort(); } @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl()); return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } private <T> FilterChainBuilder getFilterChainBuilder (URL url) { return ScopeModelUtil.getExtensionLoader(FilterChainBuilder.class, url.getScopeModel()).getDefaultExtension(); } } ↓ @Override public <T> Invoker<T> buildInvokerChain (final Invoker<T> originalInvoker, String key, String group) { Invoker<T> last = originalInvoker; URL url = originalInvoker.getUrl(); List<ModuleModel> moduleModels = getModuleModelsFromUrl(url); List<Filter> filters; if (!CollectionUtils.isEmpty(filters)) { for (int i = filters.size() - 1 ; i >= 0 ; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new CopyOfFilterChainNode <>(originalInvoker, next, filter); } return new CallbackRegistrationInvoker <>(last, filters); } return last; }
进入 export 方法中,发现主要有两个分支逻辑。
分支一:如果是注册协议的话,那么就中规中矩走注册协议的逻辑。 分支二:如果不是注册协议的话,那么就将 invoker 用过滤器层层包装起来,将包装后的对象再次传入 protocol.export 方法中。 分之一的逻辑,想必你已经非常清楚了,就是上一讲详细学习的。
分支二的逻辑,为什么会有这么多的过滤器,是干什么用的呢?还记得在“[缓存操作]”“[参数验证]”“[流量控制]”几讲中提过的 Filter 过滤器么,其实就是在分支二这个环节加载进去的,不信的话,我们就挑“[流量控制]”的代码,断点查看一下 buildInvokerChain 的返参数据就知道了。
启动“流量控制”提供方代码,断点查看。
看完图中的构建过滤器的结果,恍然大悟,原来过滤器的组装早就在导出的环节就已经准备就绪了,后续触发过滤器的调用,也只不过是按部就班地从对象的最外层一直执行到最内层而已。