Dubbo 服务暴露 原创: 怀风 光明和黑暗互相吸引 1月25日 Dubbo 服务暴露 服务暴露 大家都知道Dubbo是由consumer,provider,registry这三大部分组 那么provider的如果将服务提供给consumer调用呢,就是通过服务暴露来实现的,也就是把我们原来单机架构中的接口,对外部暴露 服务暴露的流程 dubbo服务暴露的流程大概如上图,在dubbo中,所有的服务都会被包装成一个invoker,这一点也将贯穿今后整个学习 dubbo服务暴露可以理解为两部分:本地暴露,远程暴露 本地暴露的接口通常用于我们直接invoke dubbo的接口,以及有些时候我们的服务既是provider又是consumer,避免远程调用造成的资源浪费 远程暴露则是将服务信息注册到registry,并且将服务通过网络提供给其他应用调用 源码解析 初始化 public class ServiceBean < T > extends ServiceConfig < T > implements InitializingBean DisposableBean ApplicationContextAware ApplicationListener < ContextRefreshedEvent >, BeanNameAware { private static final long serialVersionUID = 213195494150089726L ; /** *此处省略其他代码 **/ @Override public void onApplicationEvent ( ContextRefreshedEvent event ) { if (! isExported () && ! isUnexported ()) { if ( logger . isInfoEnabled ()) { logger . info ( "The service ready on spring started. service: " + getInterface ()); } //服务暴露 export (); } } @Override @SuppressWarnings ({ "unchecked" "deprecation" }) public void afterPropertiesSet () throws Exception { //此处省略.... if (! supportedApplicationListener ) { //服务暴露 export (); } } } 首先我们来看一下ServiceBean,ServiceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener有没有觉得很熟悉,实现这几个类就能在spring初始化的时候do something 暴露服务 我们看到这里都调用了export()方法 public synchronized void export () { if ( provider != null ) { if ( export == null ) { export = provider . getExport (); } if ( delay == null ) { delay = provider . getDelay (); } } if ( export != null && ! export ) { return ; } if ( delay != null && delay > 0 ) { delayExportExecutor . schedule ( new Runnable () { @Override public void run () { doExport (); } }, delay TimeUnit . MILLISECONDS ); } else { doExport (); } } 这里看到ServiceConfig.export方法上加了一个锁,用来保证不会重复暴露服务,抛开上面的逻辑判断,在第一次初始化的时候,是直接走到了doExport()方法 protected synchronized void doExport () { //省略判断代码 doExportUrls (); } private void doExportUrls () { //加载注册中心的配置 List < URL > registryURLs = loadRegistries ( true ); //把使用的协议注册到注册中心 for ( ProtocolConfig protocolConfig : protocols ) { doExportUrlsFor1Protocol ( protocolConfig registryURLs ); } } 这里dubbo支持多协议,可以看到通过for循环可以把配置的多种协议都导出,进行暴露 private void doExportUrlsFor1Protocol ( ProtocolConfig protocolConfig List < URL > registryURLs ) { //省略判断代码 // 导出服务 String contextPath = protocolConfig . getContextpath (); if (( contextPath == null || contextPath . length () == 0 ) && provider != null ) { contextPath = provider . getContextpath (); } String host = this . findConfigedHosts ( protocolConfig registryURLs map ); Integer port = this . findConfigedPorts ( protocolConfig name map ); URL url = new URL ( name host port ( contextPath == null || contextPath . length () == 0 ? "" : contextPath + "/" ) + path map ); if ( ExtensionLoader . getExtensionLoader ( ConfiguratorFactory . class ) . hasExtension ( url . getProtocol ())) { url = ExtensionLoader . getExtensionLoader ( ConfiguratorFactory . class ) . getExtension ( url . getProtocol ()). getConfigurator ( url ). configure ( url ); } String scope = url . getParameter ( Constants . SCOPE_KEY ); // 没有配置时不导出 if (! Constants . SCOPE_NONE . equalsIgnoreCase ( scope )) { // 导出本地服务 if (! Constants . SCOPE_REMOTE . equalsIgnoreCase ( scope )) { exportLocal ( url ); } // 导出远程服务 if (! Constants . SCOPE_LOCAL . equalsIgnoreCase ( scope )) { if ( logger . isInfoEnabled ()) { logger . info ( "Export dubbo service " + interfaceClass . getName () + " to url " + url ); } if ( registryURLs != null && ! registryURLs . isEmpty ()) { //将服务都注册到当前已有的注册中心上去 for ( URL registryURL : registryURLs ) { url = url . addParameterIfAbsent ( Constants . DYNAMIC_KEY registryURL . getParameter ( Constants . DYNAMIC_KEY )); //判断是否有监控中心 URL monitorUrl = loadMonitor ( registryURL ); if ( monitorUrl != null ) { url = url . addParameterAndEncoded ( Constants . MONITOR_KEY monitorUrl . toFullString ()); } if ( logger . isInfoEnabled ()) { logger . info ( "Register dubbo service " + interfaceClass . getName () + " url " + url + " to registry " + registryURL ); } //对于providers,这用于启用自定义代理以生成invoker String proxy = url . getParameter ( Constants . PROXY_KEY ); if ( StringUtils . isNotEmpty ( proxy )) { registryURL = registryURL . addParameter ( Constants . PROXY_KEY proxy ); } Invoker <?> invoker = proxyFactory . getInvoker ( ref ( Class ) interfaceClass registryURL . addParameterAndEncoded ( Constants . EXPORT_KEY url . toFullString ())); //包装调用者和所有元数据的Invoker包装器 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker ( invoker this ); Exporter <?> exporter = protocol . export ( wrapperInvoker ); exporters . add ( exporter ); } } else { //没有注册中心直接暴露 Invoker <?> invoker = proxyFactory . getInvoker ( ref ( Class ) interfaceClass url ); //包装调用者和所有元数据的Invoker包装器 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker ( invoker this ); Exporter <?> exporter = protocol . export ( wrapperInvoker ); exporters . add ( exporter ); } } } this . urls . add ( url ); } //暴露本地服务 private void exportLocal ( URL url ) { if (! Constants . LOCAL_PROTOCOL . equalsIgnoreCase ( url . getProtocol ())) { //手动暴露一个本地服务 URL local = URL . valueOf ( url . toFullString ()) . setProtocol ( Constants . LOCAL_PROTOCOL ) . setHost ( LOCALHOST ) . setPort ( 0 ); Exporter <?> exporter = protocol . export ( proxyFactory . getInvoker ( ref ( Class ) interfaceClass local )); exporters . add ( exporter ); logger . info ( "Export dubbo service " + interfaceClass . getName () + " to local registry" ); } } 这你scope配置默认值是null,则本地服务和远程服务都导出,另外如果没有配置注册中心,将直接将接口暴露出去,我们可以根据自己所在的场景,选择都暴露还是指定暴露 由于dubbo也是支持多注册中心的,所以可以通过for循环,将多个服务都注册到当前已有的注册中心上去 在exportLocal方法这是将配置中解析好的url参数手动修改成本地协议进行服务暴露 ProxyFactory是通过SPI获取JavassistProxyFactory靠Javassist字节码技术动态的生成Invoker类,大家有兴趣的可以下去了解一下 暴露的细节 public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol ; //装饰者模式 public ProtocolFilterWrapper ( Protocol protocol ) { if ( protocol == null ) { throw new IllegalArgumentException ( "protocol == null" ); } this . protocol = protocol ; } } 在ServiceConfig中,通过SPI获取相应的Protocol,SPI中会对实现类进行装饰,每次执行protocol.exprot()方法的时候,其实都是执行的ProtocolFilterWrapper的protocol.exprot方法 @Override public < T > Exporter < T > export ( Invoker < T > invoker ) throws RpcException { //如果是注册中心协议直接导出 if ( Constants . REGISTRY_PROTOCOL . equals ( invoker . getUrl (). getProtocol ())) { return protocol . export ( invoker ); } //如果不是则执行整个filter的责任链 return protocol . export ( buildInvokerChain ( invoker Constants . SERVICE_FILTER_KEY Constants . PROVIDER )); } //责任链模式,对filter进行逐个执行 private static < T > Invoker < T > buildInvokerChain ( final Invoker < T > invoker String key String group ) { Invoker < T > last = invoker ; List < Filter > filters = ExtensionLoader . getExtensionLoader ( Filter . class ). getActivateExtension ( invoker . getUrl (), key group ); if (! filters . isEmpty ()) { for ( int i = filters . size () - 1 ; i >= 0 ; i --) { final Filter filter = filters . get ( i ); final Invoker < T > next = last ; last = new Invoker < T >() { @Override public Class < T > getInterface () { return invoker . getInterface (); } @Override public URL getUrl () { return invoker . getUrl (); } @Override public boolean isAvailable () { return invoker . isAvailable (); } @Override public Result invoke ( Invocation invocation ) throws RpcException { Result result = filter . invoke ( next invocation ); if ( result instanceof AsyncRpcResult ) { AsyncRpcResult asyncResult = ( AsyncRpcResult ) result ; asyncResult . thenApplyWithContext ( r -> filter . onResponse ( r invoker invocation )); return asyncResult ; } else { return filter . onResponse ( result invoker invocation ); } } @Override public void destroy () { invoker . destroy (); } @Override public String toString () { return invoker . toString (); } }; } } return last ; } 由上述代码我们可以看到,dubbo中运用装饰者模式和责任链模式,对我们提供的服务做了一次封装,最终转换成我们需要的invoker对外暴露 要注意到的是,当我们的协议是registry也就是注册协议的时候,是不需要进行构建责任链的 本地暴露 @Override public < T > Exporter < T > export ( Invoker < T > invoker ) throws RpcException { return new InjvmExporter < T >( invoker invoker . getUrl (). getServiceKey (), exporterMap ); } 如果是本地暴露,则通过SPI拿到InjvmProtocol,最终通过injvm协议导出InjvmExporter 远程暴露 @Override public < T > Exporter < T > export ( Invoker < T > invoker ) throws RpcException { URL url = invoker . getUrl (); // 导出服务. String key = serviceKey ( url ); DubboExporter < T > exporter = new DubboExporter < T >( invoker key exporterMap ); exporterMap . put ( key exporter ); //导出根服务以进行调度事件 Boolean isStubSupportEvent = url . getParameter ( Constants . STUB_EVENT_KEY Constants . DEFAULT_STUB_EVENT ); Boolean isCallbackservice = url . getParameter ( Constants . IS_CALLBACK_SERVICE false ); if ( isStubSupportEvent && ! isCallbackservice ) { String stubServiceMethods = url . getParameter ( Constants . STUB_EVENT_METHODS_KEY ); if ( stubServiceMethods == null || stubServiceMethods . length () == 0 ) { if ( logger . isWarnEnabled ()) { logger . warn ( new IllegalStateException ( "consumer [" + url . getParameter ( Constants . INTERFACE_KEY ) + "], has set stubproxy support event ,but no stub methods founded." )); } } else { stubServiceMethodsMap . put ( url . getServiceKey (), stubServiceMethods ); } } //打开服务 openServer ( url ); optimizeSerialization ( url ); return exporter ; } //打开服务 private void openServer ( URL url ) { // find server. String key = url . getAddress (); //客户端可以导出仅供服务器调用的服务 boolean isServer = url . getParameter ( Constants . IS_SERVER_KEY true ); if ( isServer ) { ExchangeServer server = serverMap . get ( key ); if ( server == null ) { synchronized ( this ) { server = serverMap . get ( key ); if ( server == null ) { //如果服务不存在,创建服务 serverMap . put ( key createServer ( url )); } } } else { // 服务器支持重置,与override一起使用 server . reset ( url ); } } } 在dubbo协议中,我们看到在服务导出的时候会根据配置地址,打开netty服务,也就是通过这一步,开启了RPC端口,使consumer通过TCP协议进行服务调用 服务注册 @Override public < T > Exporter < T > export ( final Invoker < T > originInvoker ) throws RpcException { URL registryUrl = getRegistryUrl ( originInvoker ); // url在本地导出 URL providerUrl = getProviderUrl ( originInvoker ); // 订阅覆盖数据 // 同样的服务由于订阅是带有服务名称的缓存密钥,因此会导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl ( providerUrl ); final OverrideListener overrideSubscribeListener = new OverrideListener ( overrideSubscribeUrl originInvoker ); overrideListeners . put ( overrideSubscribeUrl overrideSubscribeListener ); providerUrl = overrideUrlWithConfig ( providerUrl overrideSubscribeListener ); //导出invoker final ExporterChangeableWrapper < T > exporter = doLocalExport ( originInvoker providerUrl ); //将url注册到注册中心 final Registry registry = getRegistry ( originInvoker ); final URL registeredProviderUrl = getRegisteredProviderUrl ( providerUrl registryUrl ); ProviderInvokerWrapper < T > providerInvokerWrapper = ProviderConsumerRegTable . registerProvider ( originInvoker registryUrl registeredProviderUrl ); //判断我们是否需要推迟发布 boolean register = registeredProviderUrl . getParameter ( "register" true ); if ( register ) { register ( registryUrl registeredProviderUrl ); providerInvokerWrapper . setReg ( true ); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry . subscribe ( overrideSubscribeUrl overrideSubscribeListener ); exporter . setRegisterUrl ( registeredProviderUrl ); exporter . setSubscribeUrl ( overrideSubscribeUrl ); //确保每次导出时都返回一个新的导出器实例 return new DestroyableExporter <>( exporter ); } //真正导出服务的地方 private < T > ExporterChangeableWrapper < T > doLocalExport ( final Invoker < T > originInvoker URL providerUrl ) { String key = getCacheKey ( originInvoker ); ExporterChangeableWrapper < T > exporter = ( ExporterChangeableWrapper < T >) bounds . get ( key ); if ( exporter == null ) { synchronized ( bounds ) { exporter = ( ExporterChangeableWrapper < T >) bounds . get ( key ); if ( exporter == null ) { final Invoker <?> invokerDelegete = new InvokerDelegate < T >( originInvoker providerUrl ); //以dubbo协议为例,这里才是真正调用DubboProtocol.exprot的地方 exporter = new ExporterChangeableWrapper < T >(( Exporter < T >) protocol . export ( invokerDelegete ), originInvoker ); bounds . put ( key exporter ); } } } return exporter ; } 大家这里可以跟进源码,会发现,在ServiceConfig中通过proxyFactory生成的Invoker的url指向的协议其实是registry,所以在ServiceConfig中protocol.exprot调用的是RegistryProtocol的exprot方法 在RegistryProtocol中调用了真正的远程服务暴露的方法,即DubboProtocol(以dubbo协议为例),在远程服务暴露成功后,将服务信息注册到registry上去,由此完成了一个服务的导出 至此Dubbo服务暴露中的大致流程已经完成了,后面将会对Dubbo如何通过ProxyFactory生成Invoker,以及Registry是如何进行注册的进行更加深入的学习 阅读原文 微信扫一扫 关注该公众号 更多内容vip可查看 |