Dubbo 服务引用

Dubbo 服务引用

原创: 怀风  光明和黑暗互相吸引  前天
Dubbo 服务引用

服务引用


大家都知道Dubbo是由consumer,provider,registry这三大部分组成



那么consumer是如何发现provider并调用的呢,就是通过服务引用来实现的,也就是通过发现服务,然后进行调用


服务引用的流程 


dubbo服务引用的流程大概如上图,不难发现其流程跟dubbo服务暴露互逆,(关于Dubbo服务暴露Dubbo服务暴露)但最终也是通过invoker来完成我们服务引用
dubbo服务引用最终通过ProxyFactory将Invoker转化为调用的Service
dubbo服务引用过程与dubbo服务暴露相似,都是通过SPI,适配相应的协议,并将服务注册到注册中心,并最终完成服务引用
源码解析

初始化
public
 
class
 
ReferenceBean
<
T
>
 
extends
 
ReferenceConfig
<
T
>
 
implements
 
FactoryBean

 
ApplicationContextAware

 
InitializingBean

 
DisposableBean
 
{

    
//省略一部分代码

    
//获取服务接口
    
@Override
    
public
 
Object
 getObject
()
 
{
        
return
 get
();
    
}

    
@Override
    
@SuppressWarnings
({
"unchecked"
})
    
public
 
void
 afterPropertiesSet
()
 
throws
 
Exception
 
{
        
//此处省略 配置校验代码
        
Boolean
 b 
=
 isInit
();
        
if
 
(

==
 
null
 
&&
 getConsumer
()
 
!=
 
null
)
 
{
            b 
=
 getConsumer
().
isInit
();
        
}
        
if
 
(

!=
 
null
 
&&
 b
)
 
{
            
//发现服务
            getObject
();
        
}
    
}
}
首先我们来看一下ReferenceBean, ReferenceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener这里同服务暴露一样,通过spring在初始化的时候进行服务引用
服务引用

我们看到这里都调用了getObject()方法,其实是调用了ReferenceConfig中的get()方法,接下来我们一起看下ReferenceConfig中的get()方法
    
public
 
synchronized
 T get
()
 
{
        
//配置校验
        checkAndUpdateSubConfigs
();
        
//如果该服务已被销毁,则抛出异常
        
if
 
(
destroyed
)
 
{
            
throw
 
new
 
IllegalStateException
(
"The invoker of ReferenceConfig("
 
+
 url 
+
 
") has already destroyed!"
);
        
}
        
//如果服务为空,则进行初始化,否则直接返回
        
if
 
(
ref 
==
 
null
)
 
{
            init
();
        
}
        
return
 ref
;
    
}
这里看到ReferenceConfig.get方法上加了一个锁,用来保证不会重复发现服务,而该方法的核心在于init()方法
    
private
 
void
 init
()
 
{
        
if
 
(
initialized
)
 
{
            
return
;
        
}
        initialized 
=
 
true
;
        checkStubAndLocal
(
interfaceClass
);
        
//校验mock
        checkMock
(
interfaceClass
);
        
Map
<
String

 
String
>
 map 
=
 
new
 
HashMap
<
String

 
String
>();

        
//省略对参数解析设置 ...

        
//创建代理对象
        ref 
=
 createProxy
(
map
);

        
ApplicationModel
.
initConsumerModel
(
getUniqueServiceName
(),
 buildConsumerModel
(
attributes
));
    
}
这里通过对参数的解析来创建服务代理, createProxy()方法是整个服务引用初始化的关键
    
private
 T createProxy
(
Map
<
String

 
String
>
 map
)
 
{
        URL tmpUrl 
=
 
new
 URL
(
"temp"

 
"localhost"

 
0

 map
);
        
final
 
boolean
 isJvmRefer
;
        
if
 
(
isInjvm
()
 
==
 
null
)
 
{
            
if
 
(
url 
!=
 
null
 
&&
 url
.
length
()
 
>
 
0
)
 
{
 
// 如果指定了url,则不要进行本地引用
                isJvmRefer 
=
 
false
;
            
}
 
else
 
{
                
// 默认情况下,引用本地服务(如果有)
                isJvmRefer 
=
 
InjvmProtocol
.
getInjvmProtocol
().
isInjvmRefer
(
tmpUrl
);
            
}
        
}
 
else
 
{
            isJvmRefer 
=
 isInjvm
();
        
}

        
if
 
(
isJvmRefer
)
 
{
            URL url 
=
 
new
 URL
(
Constants
.
LOCAL_PROTOCOL

 
Constants
.
LOCALHOST_VALUE

 
0

 interfaceClass
.
getName
()).
addParameters
(
map
);
            invoker 
=
 refprotocol
.
refer
(
interfaceClass

 url
);
            
if
 
(
logger
.
isInfoEnabled
())
 
{
                logger
.
info
(
"Using injvm service "
 
+
 interfaceClass
.
getName
());
            
}
        
}
 
else
 
{
            
if
 
(
url 
!=
 
null
 
&&
 url
.
length
()
 
>
 
0
)
 
{
 
// 用户指定的URL,可以是对等地址,也可以是注册中心的地址.
                
String
[]
 us 
=
 
Constants
.
SEMICOLON_SPLIT_PATTERN
.
split
(
url
);
                
if
 
(
us 
!=
 
null
 
&&
 us
.
length 
>
 
0
)
 
{
                    
for
 
(
String
 u 
:
 us
)
 
{
                        URL url 
=
 URL
.
valueOf
(
u
);
                        
if
 
(
StringUtils
.
isEmpty
(
url
.
getPath
()))
 
{
                            url 
=
 url
.
setPath
(
interfaceName
);
                        
}
                        
if
 
(
Constants
.
REGISTRY_PROTOCOL
.
equals
(
url
.
getProtocol
()))
 
{
                            urls
.
add
(
url
.
addParameterAndEncoded
(
Constants
.
REFER_KEY

 
StringUtils
.
toQueryString
(
map
)));
                        
}
 
else
 
{
                            urls
.
add
(
ClusterUtils
.
mergeUrl
(
url

 map
));
                        
}
                    
}
                
}
            
}
 
else
 
{
 
// x来自注册中心配置的URL
                checkRegistry
();
                
List
<
URL
>
 us 
=
 loadRegistries
(
false
);
                
if
 
(
CollectionUtils
.
isNotEmpty
(
us
))
 
{
                    
for
 
(
URL u 
:
 us
)
 
{
                        URL monitorUrl 
=
 loadMonitor
(
u
);
                        
if
 
(
monitorUrl 
!=
 
null
)
 
{
                            map
.
put
(
Constants
.
MONITOR_KEY

 URL
.
encode
(
monitorUrl
.
toFullString
()));
                        
}
                        urls
.
add
(
u
.
addParameterAndEncoded
(
Constants
.
REFER_KEY

 
StringUtils
.
toQueryString
(
map
)));
                    
}
                
}
                
if
 
(
urls
.
isEmpty
())
 
{
                    
throw
 
new
 
IllegalStateException
(
"No such any registry to reference "
 
+
 interfaceName 
+
 
" on the consumer "
 
+
 
NetUtils
.
getLocalHost
()
 
+
 
" use dubbo version "
 
+
 
Version
.
getVersion
()
 
+
 
", please config <dubbo:registry address=\"...\" /> to your spring config."
);
                
}
            
}
            
//这里的refprotocol.refer即通过registryProtocol来进行发现
            
if
 
(
urls
.
size
()
 
==
 
1
)
 
{
                invoker 
=
 refprotocol
.
refer
(
interfaceClass

 urls
.
get
(
0
));
            
}
 
else
 
{
                
List
<
Invoker
<?>>
 invokers 
=
 
new
 
ArrayList
<
Invoker
<?>>();
                URL registryURL 
=
 
null
;
                
for
 
(
URL url 
:
 urls
)
 
{
                    invokers
.
add
(
refprotocol
.
refer
(
interfaceClass

 url
));
                    
if
 
(
Constants
.
REGISTRY_PROTOCOL
.
equals
(
url
.
getProtocol
()))
 
{
                        registryURL 
=
 url
;
 
// 使用最后一个注册表网址
                    
}
                
}
                
if
 
(
registryURL 
!=
 
null
)
 
{
 
// 注册表网址可用
                    
// 仅在寄存器的群集可用时才使用RegistryAwareCluster
                    URL u 
=
 registryURL
.
addParameter
(
Constants
.
CLUSTER_KEY

 
RegistryAwareCluster
.
NAME
);
                    
//调用者包装关系将是:RegistryAwareClusterInvoker(StaticDirectory) - > FailoverClusterInvoker(RegistryDirectory,将执行路由) - > Invoker                    invoker = cluster.join(new StaticDirectory(u, invokers));
                
}
 
else
 
{
 
// 不是注册表网址,必须直接调用。
                    
//这里要注意 cluster 最终都会被包装成 MockClusterWrapper(SPI的依赖注入) 
                    invoker 
=
 cluster
.
join
(
new
 
StaticDirectory
(
invokers
));
                
}
            
}
        
}

        
Boolean
 c 
=
 check
;
        
if
 
(

==
 
null
 
&&
 consumer 
!=
 
null
)
 
{
            c 
=
 consumer
.
isCheck
();
        
}
        
if
 
(

==
 
null
)
 
{
            c 
=
 
true
;
 
// default true
        
}
        
if
 
(

&&
 
!
invoker
.
isAvailable
())
 
{
            
// 如果提供者暂时不可用,则允许消费者稍后重试
            initialized 
=
 
false
;
            
throw
 
new
 
IllegalStateException
(
"Failed to check the status of the service "
 
+
 interfaceName 
+
 
". No provider available for the service "
 
+
 
(
group 
==
 
null
 
?
 
""
 
:
 group 
+
 
"/"
)
 
+
 interfaceName 
+
 
(
version 
==
 
null
 
?
 
""
 
:
 
":"
 
+
 version
)
 
+
 
" from the url "
 
+
 invoker
.
getUrl
()
 
+
 
" to the consumer "
 
+
 
NetUtils
.
getLocalHost
()
 
+
 
" use dubbo version "
 
+
 
Version
.
getVersion
());
        
}
        
if
 
(
logger
.
isInfoEnabled
())
 
{
            logger
.
info
(
"Refer dubbo service "
 
+
 interfaceClass
.
getName
()
 
+
 
" from url "
 
+
 invoker
.
getUrl
());
        
}
        
/**
         * @since 2.7.0
         * ServiceData Store
         */
        
MetadataReportService
 metadataReportService 
=
 
null
;
        
if
 
((
metadataReportService 
=
 getMetadataReportService
())
 
!=
 
null
)
 
{
            URL consumerURL 
=
 
new
 URL
(
Constants
.
CONSUMER_PROTOCOL

 map
.
remove
(
Constants
.
REGISTER_IP_KEY
),
 
0

 map
.
get
(
Constants
.
INTERFACE_KEY
),
 map
);
            metadataReportService
.
publishConsumer
(
consumerURL
);
        
}
        
// create service proxy
        
return
 
(
T
)
 proxyFactory
.
getProxy
(
invoker
);
    
}
这里可以看到dubbo在服务引用中也可以使用本地服务的发现,但是可看到这一块已经被标记为过时,我的理解是dubbo作为一个RPC框架,本地服务还通过dubbo去调用,肯定与dubbo本身的意义不相匹配,所以便不推荐使用
这块代码我们可以发现同服务暴露一样,会将consumer注册到所有配置的注册中心上去,而refprotocol.refer则是服务引用的核心代码
cluster对invoker进行了一层包装,以便应对后续服务调用中出现的异常情况进行处理
最后我们的invoker将通过代理工厂转换为可以调用的代理服务
RegistryProtocal中的refer

    
@Override
    
@SuppressWarnings
(
"unchecked"
)
    
public
 
<
T
>
 
Invoker
<
T
>
 refer
(
Class
<
T
>
 type

 URL url
)
 
throws
 
RpcException
 
{
        url 
=
 url
.
setProtocol
(
url
.
getParameter
(
REGISTRY_KEY

 DEFAULT_REGISTRY
)).
removeParameter
(
REGISTRY_KEY
);
        
//获取注册中心
        
Registry
 registry 
=
 registryFactory
.
getRegistry
(
url
);
        
//如果是注册中心的服务,直接返回注册中心类型的invoker
        
if
 
(
RegistryService
.
class
.
equals
(
type
))
 
{
            
return
 proxyFactory
.
getInvoker
((
T
)
 registry

 type

 url
);
        
}

        
// group="a,b" or group="*"
        
Map
<
String

 
String
>
 qs 
=
 
StringUtils
.
parseQueryString
(
url
.
getParameterAndDecoded
(
REFER_KEY
));
        
String
 group 
=
 qs
.
get
(
Constants
.
GROUP_KEY
);
        
if
 
(
group 
!=
 
null
 
&&
 group
.
length
()
 
>
 
0
)
 
{
            
if
 
((
COMMA_SPLIT_PATTERN
.
split
(
group
)).
length 
>
 
1
 
||
 
"*"
.
equals
(
group
))
 
{
                
return
 doRefer
(
getMergeableCluster
(),
 registry

 type

 url
);
            
}
        
}
        
//发现服务
        
return
 doRefer
(
cluster

 registry

 type

 url
);
    
}

    
private
 
<
T
>
 
Invoker
<
T
>
 doRefer
(
Cluster
 cluster

 
Registry
 registry

 
Class
<
T
>
 type

 URL url
)
 
{
        
//创建并设置注册目录对象
        
RegistryDirectory
<
T
>
 directory 
=
 
new
 
RegistryDirectory
<
T
>(
type

 url
);
        directory
.
setRegistry
(
registry
);
        directory
.
setProtocol
(
protocol
);
        
// all attributes of REFER_KEY
        
Map
<
String

 
String
>
 parameters 
=
 
new
 
HashMap
<
String

 
String
>(
directory
.
getUrl
().
getParameters
());
        URL subscribeUrl 
=
 
new
 URL
(
CONSUMER_PROTOCOL

 parameters
.
remove
(
REGISTER_IP_KEY
),
 
0

 type
.
getName
(),
 parameters
);
        
if
 
(!
ANY_VALUE
.
equals
(
url
.
getServiceInterface
())
 
&&
 url
.
getParameter
(
REGISTER_KEY

 
true
))
 
{
            directory
.
setRegisteredConsumerUrl
(
getRegisteredConsumerUrl
(
subscribeUrl

 url
));
            
//注册服务
            registry
.
register
(
directory
.
getRegisteredConsumerUrl
());
        
}
        directory
.
buildRouterChain
(
subscribeUrl
);
        
//订阅服务
        directory
.
subscribe
(
subscribeUrl
.
addParameter
(
CATEGORY_KEY

                PROVIDERS_CATEGORY 
+
 
","
 
+
 CONFIGURATORS_CATEGORY 
+
 
","
 
+
 ROUTERS_CATEGORY
));
        
//装饰Invoker
        
Invoker
 invoker 
=
 cluster
.
join
(
directory
);
        
ProviderConsumerRegTable
.
registerConsumer
(
invoker

 url

 subscribeUrl

 directory
);
        
return
 invoker
;
    
}
在RegistryProtocal中,我们看到了cluster.join(directory),在ReferenceConfig中也出现过,在ReferenceConfig中没有注册中心的时候将直接使用装饰invoker,以供我们接下来服务调用来做集群容错
服务引用在RegistryProtocal中的核心方法即为doRefer方法
RegistryDirectory

    
/**
     * 将网址转换为调用者,如果网址已被引用,则不会重新引用。
     *
     * @param urls
     * @return invokers
     */
    
private
 
Map
<
String

 
Invoker
<
T
>>
 toInvokers
(
List
<
URL
>
 urls
)
 
{
        
Map
<
String

 
Invoker
<
T
>>
 newUrlInvokerMap 
=
 
new
 
HashMap
<>();
        
if
 
(
urls 
==
 
null
 
||
 urls
.
isEmpty
())
 
{
            
return
 newUrlInvokerMap
;
        
}
        
Set
<
String
>
 keys 
=
 
new
 
HashSet
<>();
        
String
 queryProtocols 
=
 
this
.
queryMap
.
get
(
Constants
.
PROTOCOL_KEY
);
        
for
 
(
URL providerUrl 
:
 urls
)
 
{
            
// 如果在参考侧配置协议,则仅选择匹配协议
            
if
 
(
queryProtocols 
!=
 
null
 
&&
 queryProtocols
.
length
()
 
>
 
0
)
 
{
                
boolean
 accept 
=
 
false
;
                
String
[]
 acceptProtocols 
=
 queryProtocols
.
split
(
","
);
                
for
 
(
String
 acceptProtocol 
:
 acceptProtocols
)
 
{
                    
if
 
(
providerUrl
.
getProtocol
().
equals
(
acceptProtocol
))
 
{
                        accept 
=
 
true
;
                        
break
;
                    
}
                
}
                
if
 
(!
accept
)
 
{
                    
continue
;
                
}
            
}
            
if
 
(
Constants
.
EMPTY_PROTOCOL
.
equals
(
providerUrl
.
getProtocol
()))
 
{
                
continue
;
            
}
            
if
 
(!
ExtensionLoader
.
getExtensionLoader
(
Protocol
.
class
).
hasExtension
(
providerUrl
.
getProtocol
()))
 
{
                logger
.
error
(
new
 
IllegalStateException
(
"Unsupported protocol "
 
+
 providerUrl
.
getProtocol
()
 
+
                        
" in notified url: "
 
+
 providerUrl 
+
 
" from registry "
 
+
 getUrl
().
getAddress
()
 
+
                        
" to consumer "
 
+
 
NetUtils
.
getLocalHost
()
 
+
 
", supported protocol: "
 
+
                        
ExtensionLoader
.
getExtensionLoader
(
Protocol
.
class
).
getSupportedExtensions
()));
                
continue
;
            
}
            URL url 
=
 mergeUrl
(
providerUrl
);

            
String
 key 
=
 url
.
toFullString
();
 
// 参数URL已排序
            
if
 
(
keys
.
contains
(
key
))
 
{
 
//重复的网址
                
continue
;
            
}
            keys
.
add
(
key
);
            
// 缓存键是不与消费者方参数合并的URL,无论消费者如何组合参数,如果服务器URL更改,则再次引用
            
Map
<
String

 
Invoker
<
T
>>
 localUrlInvokerMap 
=
 
this
.
urlInvokerMap
;
 
// 本地发现
            
Invoker
<
T
>
 invoker 
=
 localUrlInvokerMap 
==
 
null
 
?
 
null
 
:
 localUrlInvokerMap
.
get
(
key
);
            
if
 
(
invoker 
==
 
null
)
 
{
 
// 不在缓存中,请再次发现
                
try
 
{
                    
boolean
 enabled 
=
 
true
;
                    
if
 
(
url
.
hasParameter
(
Constants
.
DISABLED_KEY
))
 
{
                        enabled 
=
 
!
url
.
getParameter
(
Constants
.
DISABLED_KEY

 
false
);
                    
}
 
else
 
{
                        enabled 
=
 url
.
getParameter
(
Constants
.
ENABLED_KEY

 
true
);
                    
}
                    
if
 
(
enabled
)
 
{
                        invoker 
=
 
new
 
InvokerDelegate
<>(
protocol
.
refer
(
serviceType

 url
),
 url

 providerUrl
);
                    
}
                
}
 
catch
 
(
Throwable
 t
)
 
{
                    logger
.
error
(
"Failed to refer invoker for interface:"
 
+
 serviceType 
+
 
",url:("
 
+
 url 
+
 
")"
 
+
 t
.
getMessage
(),
 t
);
                
}
                
if
 
(
invoker 
!=
 
null
)
 
{
 
// Put new invoker in cache
                    newUrlInvokerMap
.
put
(
key

 invoker
);
                
}
            
}
 
else
 
{
                newUrlInvokerMap
.
put
(
key

 invoker
);
            
}
        
}
        keys
.
clear
();
        
return
 newUrlInvokerMap
;
    
}
那我们的服务最后是如何通相应协议打开consumer和provider的链接呢,关键代码就在RegistryDirectory的toInvokers方法,将url转换成具体的invoker,这个方法在订阅服务的时候会被触发,并且这里做了一层缓存,防止服务被多次引用
DubboProtocal中的refer

    
@Override
    
public
 
<
T
>
 
Invoker
<
T
>
 refer
(
Class
<
T
>
 serviceType

 URL url
)
 
throws
 
RpcException
 
{
        optimizeSerialization
(
url
);

        
// create rpc invoker.
        
DubboInvoker
<
T
>
 invoker 
=
 
new
 
DubboInvoker
<
T
>(
serviceType

 url

 getClients
(
url
),
 invokers
);
        invokers
.
add
(
invoker
);

        
return
 invoker
;
    
}
这里我们以Dubbo协议为例,看到DubboProtocal中的refer很简单,就是创建一个netty客户端,与provider进行连接返回一个Invoker即完成了一次服务的引用
最后通过ProxyFactory的字节码结束,生成代理的可供调用的服务,到这里dubbo服务引用的流程就结束了,可以看出服务引用与服务暴露的过程中有很多类似的地方,其中还有很多细节没有展开,这也将是后续学习的重点

阅读原文

微信扫一扫
关注该公众号

更多内容vip可查看