dubbo源码分析(七)服务发布和调用的链路追踪

2022-06-18 12:37:14

一、Dubbo 整体设计

1、dubbo模块概览

  • config** 配置层**:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类

  • proxy服务代理层:服务接口透明代理,生成动态代理 扩展接口为 ProxyFactory

  • registry注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService

  • cluster路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance

  • monitor监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService

  • protocol远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter

  • exchange信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

  • transport网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

  • serialize数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

2、dubbo执行流程

二、服务发布

1、dubbo一次服务发布的堆栈信息

//对dubbo:// 协议进行处理 处理逻辑和registry相似

createServer:284, DubboProtocol (com.alibaba.dubbo.rpc.protocol.dubbo)

openServer:263, DubboProtocol (com.alibaba.dubbo.rpc.protocol.dubbo)

export:250, DubboProtocol (com.alibaba.dubbo.rpc.protocol.dubbo)

export:56, QosProtocolWrapper (com.alibaba.dubbo.qos.protocol)

export:100, ProtocolFilterWrapper (com.alibaba.dubbo.rpc.protocol)

export:57, ProtocolListenerWrapper (com.alibaba.dubbo.rpc.protocol)

export:-1, Protocol$Adaptive (com.alibaba.dubbo.rpc)

//先对registry://协议处理

doLocalExport:168, RegistryProtocol (com.alibaba.dubbo.registry.integration)

export:131, RegistryProtocol (com.alibaba.dubbo.registry.integration)。

export:54, QosProtocolWrapper (com.alibaba.dubbo.qos.protocol)。//运维包装类

export:98, ProtocolFilterWrapper (com.alibaba.dubbo.rpc.protocol)。//滤器链包装器

export:55, ProtocolListenerWrapper (com.alibaba.dubbo.rpc.protocol)。//协议监听器包装类

export:-1, Protocol$Adaptive (com.alibaba.dubbo.rpc).//代理对象

doExportUrlsFor1Protocol:506, ServiceConfig (com.alibaba.dubbo.config)。//根据协议protocol将invoker转换为Dubbo URL

doExportUrls:358, ServiceConfig (com.alibaba.dubbo.config)//配置信息检查

doExport:317, ServiceConfig (com.alibaba.dubbo.config).//真正执行服务发布

export:216, ServiceConfig (com.alibaba.dubbo.config)//api入口

2、堆栈信息分析

从上述服务发布请求的堆栈信息可以看出,dubbo服务发布是基于协议的,针对不同的协议使用不同的XXXProtocol类的export方法,在最终使用XxxProtocol之前会经过XxxWrapper的通用逻辑处理。因为笔者逻辑代码中使用了注册中心,所以此处一次服务发布处理了两种协议

1、registry:// 协议对应的invoker进行处理,其中有三个XXXWarpper的添加listener、添加过滤、网络处理最终交由RegistryProtocol进行Registry协议的处理

2、dubbo://协议处理与上述逻辑处理相似,最后交给DubboProtocol进行处理

3、RegistryProtocol的export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //本地方法暴露 服务方和调用方在同一个jvm实例上 可以设置不走dubbo 直接走本地jvm
        final RegistryProtocol.ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        //获取注册中心地址 zookeeper://
        URL registryUrl = getRegistryUrl(originInvoker);

        //获取注册中心 用于服务注册
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        boolean register = registedProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
            //服务注册 调用zkClient 将对应的DubboURL作为Path 创建Zookeeper 临时节点
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        //服务订阅
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //registry是ZookeeperRegistry,在ZookeeperRegistry调用subscribe处理之前会先经过AbstractRegistry的处理,
        // 然后经过FailbackRegistry处理,在FailbackRegistry中会调用ZookeeperRegistry的doSubscribe方法。
        //为创建的服务添加监听,如果节点数据变更 通知所有watch该节点的服务
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        return new RegistryProtocol.DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
    }

该方法主要有两个核心方法 Registry.registry() 服务注册,在zk服务上创建临时节点,Registry.subscribe() 为注册的服务创建服务变更的订阅,用于通知那些订阅了该服务的所有服务消费者,这里的registry是ZookeeperRegisty实现类,底层调用FailbackRegistry的registry()、subscribe(),紧着调用其doRegistry()和doSubscribe()。

  DubboProtocol的export()方法在dubbo源码解析(五)rpc模块服务发布

补充

XXXWrapper 对dubbo对象进行包装类 比如 Invoker的MockClusterInvokerWrapper

  •  ProtocolListenerWrapper: 服务发布和服务调用过程中通过ExporterListenerInvokerListener 接口进行了服务导出、引用监听
  •      ProtocolFilterWrapper: 该类在服务发布或者调用过程中为Invoker 构造拦截过滤器链 进行dubbo Rpc请求的过滤
  •      QosProtocolWrapper:  是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术
  • 这些Wrapper是通过dubbo的spi机制自动帮助我们包装进来通过ExtensionLoader.getExtensionLoader().getExtension()将/META-INF.dubbo.internal 中的配置引入。

三、服务调用

   1、dubbo一次服务调用的堆栈信息

//协议 调用

doInvoke:69, DubboInvoker (com.alibaba.dubbo.rpc.protocol.dubbo)

invoke:148, AbstractInvoker (com.alibaba.dubbo.rpc.protocol)//过滤器链

invoke:65, MonitorFilter (com.alibaba.dubbo.monitor.support)//过滤链-》监控器

invoke:72, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)//装饰类 类似于胶水

invoke:54, FutureFilter (com.alibaba.dubbo.rpc.protocol.dubbo.filter)//过滤链->异步同步回调参数

invoke:72, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)//包装类 类似于胶水

invoke:48, ConsumerContextFilter (com.alibaba.dubbo.rpc.filter)//过滤链->消费者环境初始化 invoke:72, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)

invoke:77, ListenerInvokerWrapper (com.alibaba.dubbo.rpc.listener)

invoke:56, InvokerWrapper (com.alibaba.dubbo.rpc.protocol)

doInvoke:78, FailoverClusterInvoker (com.alibaba.dubbo.rpc.cluster.support) //失败重试集群 invoke:238, AbstractClusterInvoker (com.alibaba.dubbo.rpc.cluster.support)

invoke:75, MockClusterInvoker (com.alibaba.dubbo.rpc.cluster.support.wrapper)//mock服务

invoke:52, InvokerInvocationHandler (com.alibaba.dubbo.rpc.proxy)//代理对象的中间接口

queryDemoInfo:-1, proxy0 (com.alibaba.dubbo.common.bytecode)//proxy代理对象

queryDemoInfo:27, DemoController (com.xiu.dubbo.controller)//服务放调用

  2、dubbo服务调用的逻辑梳理

  服务调用和服务发布逻辑类似基于protocol进行处理,registry协议和duubo协议进行处理 ,不管是Cluster,还是Fillter或者DuuboInvoker最终是以责任链的形式串联一起,通过invoke()方法层层调用。

  1.  针对Invoker(服务调用)生成代理对象实现类为JavassistProxyFactory,InvokerInvocationHandler是代理对象实际调用处理
  2. registry协议获取CluserInvoker(集群)通过包装获取MockClusterInvoker 该类包含FailOverClusterInvoker 失败重试
  3.  通过RegistryDirectory将dubbo协议转换为Invoker
  4.   DubboPtotocol dubbo协议获取DubboInvoker 在生成该对象之前 会被又被ProtocolFilterWrapper :ProtocolFilterWrapper调用其buildInvokerChain() 获取到所有的过滤器链,ListenerInvokerWrapper:添加监听,以及监控、回调函数、环境初始化Filter等
  5.   dubbo协议最终转换为DubboInvoker

   3、Cluster集群

         代理创建完成后进行调用的对象为Invoker,首先获取包含Invoker的Cluster,dubbo为获取的Cluster都会包装MockClusterInvoker,该集群的作用是服务调用失败的降级处理或者没有服务的时候直接走模拟请求。在MockClusterInvoker的invoke()调用AbstractClusterInvoker的invoke()方法其中会获取负载均衡策略获取合适的invoke

        第二层是FailOverClusterInvoker,失败容错机制,他需要配合重试次数调用如果失败就需要重新调用其他服务。

 4、FitterChain 过滤器链

ListenerInvokerWrapper: 为服务调用添加Listener事件

           ProtocolFilterWrapper: 构造过滤器链用于后面的duubo请求过滤 比如                ConsumerContextFilter 初始化消费者环境过滤器,FutureFilter:请求调用返回参数同步异步转换过滤器、MonitorFilter dubbo服务调用监控过滤器。

     5、DubboInvoker的doInvoke

            这里是最终调用dubbo请求的核心,主要是使用NettyServer建立连接ExchangeClient里面包含具体dubbo请求协议的细节,ExchangeClient有共享连接(默认)和独占连接。有关DubboInovke的分析请参考dubbo源码解析(六)rpc模块服务调用

  • 作者:liushangzaibeijing
  • 原文链接:https://blog.csdn.net/liushangzaibeijing/article/details/118651574
    更新时间:2022-06-18 12:37:14