Dubbo 服务引用 原创: 怀风 光明和黑暗互相吸引 前天 Dubbo 服务引用 服务引用 大家都知道Dubbo是由consumer,provider,registry这三大部分组成 那么consumer是如何发现provider并调用的呢,就是通过服务引用来实现的,也就是通过发现服务,然后进行调用 服务引用的流程 dubbo服务引用的流程大概如上图,不难发现其流程跟dubbo服务暴露互逆,(关于Dubbo服务暴露Dubbo服务暴露)但最终也是通过invoker来完成我们服务引用 dubbo服务引用最终通过ProxyFactory将Invoker转化为调用的Service dubbo服务引用过程与dubbo服务暴露相似,都是通过SPI,适配相应的协议,并将服务注册到注册中心,并最终完成服务引用 源码解析 初始化 public class ReferenceBean < T > extends ReferenceConfig < T > implements FactoryBean ApplicationContextAware InitializingBean DisposableBean { //省略一部分代码 //获取服务接口 @Override public Object getObject () { return get (); } @Override @SuppressWarnings ({ "unchecked" }) public void afterPropertiesSet () throws Exception { //此处省略 配置校验代码 Boolean b = isInit (); if ( b == null && getConsumer () != null ) { b = getConsumer (). isInit (); } if ( b != null && b ) { //发现服务 getObject (); } } } 首先我们来看一下ReferenceBean, ReferenceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener这里同服务暴露一样,通过spring在初始化的时候进行服务引用 服务引用 我们看到这里都调用了getObject()方法,其实是调用了ReferenceConfig中的get()方法,接下来我们一起看下ReferenceConfig中的get()方法 public synchronized T get () { //配置校验 checkAndUpdateSubConfigs (); //如果该服务已被销毁,则抛出异常 if ( destroyed ) { throw new IllegalStateException ( "The invoker of ReferenceConfig(" + url + ") has already destroyed!" ); } //如果服务为空,则进行初始化,否则直接返回 if ( ref == null ) { init (); } return ref ; } 这里看到ReferenceConfig.get方法上加了一个锁,用来保证不会重复发现服务,而该方法的核心在于init()方法 private void init () { if ( initialized ) { return ; } initialized = true ; checkStubAndLocal ( interfaceClass ); //校验mock checkMock ( interfaceClass ); Map < String String > map = new HashMap < String String >(); //省略对参数解析设置 ... //创建代理对象 ref = createProxy ( map ); ApplicationModel . initConsumerModel ( getUniqueServiceName (), buildConsumerModel ( attributes )); } 这里通过对参数的解析来创建服务代理, createProxy()方法是整个服务引用初始化的关键 private T createProxy ( Map < String String > map ) { URL tmpUrl = new URL ( "temp" "localhost" 0 map ); final boolean isJvmRefer ; if ( isInjvm () == null ) { if ( url != null && url . length () > 0 ) { // 如果指定了url,则不要进行本地引用 isJvmRefer = false ; } else { // 默认情况下,引用本地服务(如果有) isJvmRefer = InjvmProtocol . getInjvmProtocol (). isInjvmRefer ( tmpUrl ); } } else { isJvmRefer = isInjvm (); } if ( isJvmRefer ) { URL url = new URL ( Constants . LOCAL_PROTOCOL Constants . LOCALHOST_VALUE 0 interfaceClass . getName ()). addParameters ( map ); invoker = refprotocol . refer ( interfaceClass url ); if ( logger . isInfoEnabled ()) { logger . info ( "Using injvm service " + interfaceClass . getName ()); } } else { if ( url != null && url . length () > 0 ) { // 用户指定的URL,可以是对等地址,也可以是注册中心的地址. String [] us = Constants . SEMICOLON_SPLIT_PATTERN . split ( url ); if ( us != null && us . length > 0 ) { for ( String u : us ) { URL url = URL . valueOf ( u ); if ( StringUtils . isEmpty ( url . getPath ())) { url = url . setPath ( interfaceName ); } if ( Constants . REGISTRY_PROTOCOL . equals ( url . getProtocol ())) { urls . add ( url . addParameterAndEncoded ( Constants . REFER_KEY StringUtils . toQueryString ( map ))); } else { urls . add ( ClusterUtils . mergeUrl ( url map )); } } } } else { // x来自注册中心配置的URL checkRegistry (); List < URL > us = loadRegistries ( false ); if ( CollectionUtils . isNotEmpty ( us )) { for ( URL u : us ) { URL monitorUrl = loadMonitor ( u ); if ( monitorUrl != null ) { map . put ( Constants . MONITOR_KEY URL . encode ( monitorUrl . toFullString ())); } urls . add ( u . addParameterAndEncoded ( Constants . REFER_KEY StringUtils . toQueryString ( map ))); } } if ( urls . isEmpty ()) { throw new IllegalStateException ( "No such any registry to reference " + interfaceName + " on the consumer " + NetUtils . getLocalHost () + " use dubbo version " + Version . getVersion () + ", please config <dubbo:registry address=\"...\" /> to your spring config." ); } } //这里的refprotocol.refer即通过registryProtocol来进行发现 if ( urls . size () == 1 ) { invoker = refprotocol . refer ( interfaceClass urls . get ( 0 )); } else { List < Invoker <?>> invokers = new ArrayList < Invoker <?>>(); URL registryURL = null ; for ( URL url : urls ) { invokers . add ( refprotocol . refer ( interfaceClass url )); if ( Constants . REGISTRY_PROTOCOL . equals ( url . getProtocol ())) { registryURL = url ; // 使用最后一个注册表网址 } } if ( registryURL != null ) { // 注册表网址可用 // 仅在寄存器的群集可用时才使用RegistryAwareCluster URL u = registryURL . addParameter ( Constants . CLUSTER_KEY RegistryAwareCluster . NAME ); //调用者包装关系将是:RegistryAwareClusterInvoker(StaticDirectory) - > FailoverClusterInvoker(RegistryDirectory,将执行路由) - > Invoker invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是注册表网址,必须直接调用。 //这里要注意 cluster 最终都会被包装成 MockClusterWrapper(SPI的依赖注入) invoker = cluster . join ( new StaticDirectory ( invokers )); } } } Boolean c = check ; if ( c == null && consumer != null ) { c = consumer . isCheck (); } if ( c == null ) { c = true ; // default true } if ( c && ! invoker . isAvailable ()) { // 如果提供者暂时不可用,则允许消费者稍后重试 initialized = false ; throw new IllegalStateException ( "Failed to check the status of the service " + interfaceName + ". No provider available for the service " + ( group == null ? "" : group + "/" ) + interfaceName + ( version == null ? "" : ":" + version ) + " from the url " + invoker . getUrl () + " to the consumer " + NetUtils . getLocalHost () + " use dubbo version " + Version . getVersion ()); } if ( logger . isInfoEnabled ()) { logger . info ( "Refer dubbo service " + interfaceClass . getName () + " from url " + invoker . getUrl ()); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null ; if (( metadataReportService = getMetadataReportService ()) != null ) { URL consumerURL = new URL ( Constants . CONSUMER_PROTOCOL map . remove ( Constants . REGISTER_IP_KEY ), 0 map . get ( Constants . INTERFACE_KEY ), map ); metadataReportService . publishConsumer ( consumerURL ); } // create service proxy return ( T ) proxyFactory . getProxy ( invoker ); } 这里可以看到dubbo在服务引用中也可以使用本地服务的发现,但是可看到这一块已经被标记为过时,我的理解是dubbo作为一个RPC框架,本地服务还通过dubbo去调用,肯定与dubbo本身的意义不相匹配,所以便不推荐使用 这块代码我们可以发现同服务暴露一样,会将consumer注册到所有配置的注册中心上去,而refprotocol.refer则是服务引用的核心代码 cluster对invoker进行了一层包装,以便应对后续服务调用中出现的异常情况进行处理 最后我们的invoker将通过代理工厂转换为可以调用的代理服务 RegistryProtocal中的refer @Override @SuppressWarnings ( "unchecked" ) public < T > Invoker < T > refer ( Class < T > type URL url ) throws RpcException { url = url . setProtocol ( url . getParameter ( REGISTRY_KEY DEFAULT_REGISTRY )). removeParameter ( REGISTRY_KEY ); //获取注册中心 Registry registry = registryFactory . getRegistry ( url ); //如果是注册中心的服务,直接返回注册中心类型的invoker if ( RegistryService . class . equals ( type )) { return proxyFactory . getInvoker (( T ) registry type url ); } // group="a,b" or group="*" Map < String String > qs = StringUtils . parseQueryString ( url . getParameterAndDecoded ( REFER_KEY )); String group = qs . get ( Constants . GROUP_KEY ); if ( group != null && group . length () > 0 ) { if (( COMMA_SPLIT_PATTERN . split ( group )). length > 1 || "*" . equals ( group )) { return doRefer ( getMergeableCluster (), registry type url ); } } //发现服务 return doRefer ( cluster registry type url ); } private < T > Invoker < T > doRefer ( Cluster cluster Registry registry Class < T > type URL url ) { //创建并设置注册目录对象 RegistryDirectory < T > directory = new RegistryDirectory < T >( type url ); directory . setRegistry ( registry ); directory . setProtocol ( protocol ); // all attributes of REFER_KEY Map < String String > parameters = new HashMap < String String >( directory . getUrl (). getParameters ()); URL subscribeUrl = new URL ( CONSUMER_PROTOCOL parameters . remove ( REGISTER_IP_KEY ), 0 type . getName (), parameters ); if (! ANY_VALUE . equals ( url . getServiceInterface ()) && url . getParameter ( REGISTER_KEY true )) { directory . setRegisteredConsumerUrl ( getRegisteredConsumerUrl ( subscribeUrl url )); //注册服务 registry . register ( directory . getRegisteredConsumerUrl ()); } directory . buildRouterChain ( subscribeUrl ); //订阅服务 directory . subscribe ( subscribeUrl . addParameter ( CATEGORY_KEY PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY )); //装饰Invoker Invoker invoker = cluster . join ( directory ); ProviderConsumerRegTable . registerConsumer ( invoker url subscribeUrl directory ); return invoker ; } 在RegistryProtocal中,我们看到了cluster.join(directory),在ReferenceConfig中也出现过,在ReferenceConfig中没有注册中心的时候将直接使用装饰invoker,以供我们接下来服务调用来做集群容错 服务引用在RegistryProtocal中的核心方法即为doRefer方法 RegistryDirectory /** * 将网址转换为调用者,如果网址已被引用,则不会重新引用。 * * @param urls * @return invokers */ private Map < String Invoker < T >> toInvokers ( List < URL > urls ) { Map < String Invoker < T >> newUrlInvokerMap = new HashMap <>(); if ( urls == null || urls . isEmpty ()) { return newUrlInvokerMap ; } Set < String > keys = new HashSet <>(); String queryProtocols = this . queryMap . get ( Constants . PROTOCOL_KEY ); for ( URL providerUrl : urls ) { // 如果在参考侧配置协议,则仅选择匹配协议 if ( queryProtocols != null && queryProtocols . length () > 0 ) { boolean accept = false ; String [] acceptProtocols = queryProtocols . split ( "," ); for ( String acceptProtocol : acceptProtocols ) { if ( providerUrl . getProtocol (). equals ( acceptProtocol )) { accept = true ; break ; } } if (! accept ) { continue ; } } if ( Constants . EMPTY_PROTOCOL . equals ( providerUrl . getProtocol ())) { continue ; } if (! ExtensionLoader . getExtensionLoader ( Protocol . class ). hasExtension ( providerUrl . getProtocol ())) { logger . error ( new IllegalStateException ( "Unsupported protocol " + providerUrl . getProtocol () + " in notified url: " + providerUrl + " from registry " + getUrl (). getAddress () + " to consumer " + NetUtils . getLocalHost () + ", supported protocol: " + ExtensionLoader . getExtensionLoader ( Protocol . class ). getSupportedExtensions ())); continue ; } URL url = mergeUrl ( providerUrl ); String key = url . toFullString (); // 参数URL已排序 if ( keys . contains ( key )) { //重复的网址 continue ; } keys . add ( key ); // 缓存键是不与消费者方参数合并的URL,无论消费者如何组合参数,如果服务器URL更改,则再次引用 Map < String Invoker < T >> localUrlInvokerMap = this . urlInvokerMap ; // 本地发现 Invoker < T > invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap . get ( key ); if ( invoker == null ) { // 不在缓存中,请再次发现 try { boolean enabled = true ; if ( url . hasParameter ( Constants . DISABLED_KEY )) { enabled = ! url . getParameter ( Constants . DISABLED_KEY false ); } else { enabled = url . getParameter ( Constants . ENABLED_KEY true ); } if ( enabled ) { invoker = new InvokerDelegate <>( protocol . refer ( serviceType url ), url providerUrl ); } } catch ( Throwable t ) { logger . error ( "Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t . getMessage (), t ); } if ( invoker != null ) { // Put new invoker in cache newUrlInvokerMap . put ( key invoker ); } } else { newUrlInvokerMap . put ( key invoker ); } } keys . clear (); return newUrlInvokerMap ; } 那我们的服务最后是如何通相应协议打开consumer和provider的链接呢,关键代码就在RegistryDirectory的toInvokers方法,将url转换成具体的invoker,这个方法在订阅服务的时候会被触发,并且这里做了一层缓存,防止服务被多次引用 DubboProtocal中的refer @Override public < T > Invoker < T > refer ( Class < T > serviceType URL url ) throws RpcException { optimizeSerialization ( url ); // create rpc invoker. DubboInvoker < T > invoker = new DubboInvoker < T >( serviceType url getClients ( url ), invokers ); invokers . add ( invoker ); return invoker ; } 这里我们以Dubbo协议为例,看到DubboProtocal中的refer很简单,就是创建一个netty客户端,与provider进行连接返回一个Invoker即完成了一次服务的引用 最后通过ProxyFactory的字节码结束,生成代理的可供调用的服务,到这里dubbo服务引用的流程就结束了,可以看出服务引用与服务暴露的过程中有很多类似的地方,其中还有很多细节没有展开,这也将是后续学习的重点 阅读原文 微信扫一扫 关注该公众号 更多内容vip可查看 |