Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法,当时还有最为关键的notify服务通知更新的部分源码没有学习,本次我们来学习notify通知本地服务更新的源码。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
1 notify服务通知更新
当第一次订阅服务节点,或者服务节点目录的子节点更新时,例如新的producer上下线,将会调用notify服务通知更新的方法,会更新本地缓存的数据。
notify方法的入口是FailbackRegistry的notify方法。
/**
* FailbackRegistry的方法
* <p>
* 服务通知
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
/*
* 调用doNotify方法更新
*/
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
/**
* FailbackRegistry的方法
* <p>
* 服务通知
*/
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
//调用父类AbstractRegistry的方法
super.notify(url, listener, urls);
}
2 AbstractRegistry#notify通知更新
该方法涉及两个重要知识点:
- 一是对于拉取到的服务节点url按照类别providers、configurators 、routers进行分类,然后遍历每个类别,依次调用RegistryDirectory#notify方法触发监听回调,进行服务数据的更新。
- 二是RegistryDirectory#notify方法通知执行完毕之后,调用saveProperties方法更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
/**
* AbstractRegistry的方法
* <p>
* 通知更新
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {
// 1-4 Empty address.
logger.warn("1-4", "", "", "Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
}
//根据节点类别对url进行分类
Map<String, List<URL>> result = new HashMap<>();
//遍历url,进行分类
for (URL u : urls) {
//服务消费者和服务提供者的服务接口名匹配
if (UrlUtils.isMatch(url, u)) {
//获取url的category类别,默认providers,同时服务提供者urlServiceAddressURL固定返回providers
String category = u.getCategory(DEFAULT_CATEGORY);
//将url加入到对应类别的categoryList中
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
//result,一般有三个元素,即三个类别,providers、configurators 、routers
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
//遍历每一个类别
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
//获取类别
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
//存入categoryNotified
categoryNotified.put(category, categoryList);
//执行leitener的notify方法进行通知,listener可以是RegistryDirectory
/*
* RegistryDirectory#notify通知
*/
listener.notify(categoryList);
/*
* 本地缓存
*/
// We will update our cache file after each notification.
// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.
//将在每次通知后更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
//本地缓存,默认支持
if (localCacheEnabled) {
saveProperties(url);
}
}
}
3 RegistryDirectory#notify更新本地内存信息
该方法根据url更新RegistryDirectory对象的内存信息,将可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存。
在最后,会专门调用refreshOverrideAndInvoker方法,将服务提供者url转换为invoker,进行服务提供者的更新。
/**
* RegistryDirectory的方法
*
* 服务变更通知
* @param urls 服务提供者注册信息列表
*/
@Override
public synchronized void notify(List<URL> urls) {
if (isDestroyed()) {
return;
}
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
//类别合法性过滤
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
//根据类别分组
.collect(Collectors.groupingBy(this::judgeCategory));
//获取配置信息url集合,可以为空
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
//将配置信息url转换为Configurator集合,并赋值给configurators属性,可以为空
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
//获取路由信息url集合,可以为空
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
//将配置信息url转换为Router集合,并加入routerChain路由链,可以为空
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
//获取服务提供者url集合,可以为空
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
// 3.x added for extend URL address
//添加扩展URL地址 3.x的特性
ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
//获取AddressListener,默认空集合
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);
}
}
/*
* 将服务提供者url转换为invoker,进行服务提供者的更新
*/
refreshOverrideAndInvoker(providerURLs);
}
3.1 refreshOverrideAndInvoker刷新invoker
该方法将服务提供者url转换为invoker,进行服务提供者的更新,这在consumer对producer的信息更新部分是非常重要的一个方法。
url转换规则为:
- 如果URL已转换为invoker,则不再重新引用它并直接从缓存获取它,请注意,URL中的任何参数更改都将被重新引用。
- 如果传入invoker列表不为空,则表示它是最新的invoker列表。
- 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。
/**
* RegistryDirectory的方法
* <p>
* 将服务提供者url转换为invoker,进行服务提供者的更新
*
* @param urls 服务提供者url
*/
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
refreshInvoker(urls);
}
/**
* 将invokerURL列表转换为Invoker Map
*
* @param invokerUrls this parameter can't be null
*/
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
//如果只有一个协议为empty的url,表示最新注册中心没有任何该服务提供者url信息
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
//设置为禁止访问
this.forbidden = true; // Forbid to access
//设置routerChain的服务提供者invoker集合为一个空集合
routerChain.setInvokers(BitList.emptyList());
//关闭urlInvokerMap中的所有服务提供者invoker
destroyAllInvokers(); // Close all invokers
}
//表明可能存在服务提供者url
else {
//允许访问
this.forbidden = false; // Allow to access
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().
//使用本地引用来避免NPE。cachedInvokerUrls将被destroyAllInvokers()方法设置为空。
Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;
//空的服务提供者url集合
if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {
// 1-4 Empty address.
logger.warn("1-4", "configuration ", "",
"Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");
invokerUrls.addAll(localCachedInvokerUrls);
} else {
//缓存的invoker url,便于比较
localCachedInvokerUrls = new HashSet<>();
localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
this.cachedInvokerUrls = localCachedInvokerUrls;
}
if (invokerUrls.isEmpty()) {
return;
}
// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
//使用本地引用来避免NPE。urlInvokerMap将在destroyAllInvokers()方法设置为空。
Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
//不能使用本地引用,因为oldUrlInvokerMap的映射可能会直接在toInvokers()中删除。
Map<URL, Invoker<T>> oldUrlInvokerMap = null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
/*
* 将URL转换为Invoker
*/
Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
/*
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
// 3-1 - Failed to convert the URL address into Invokers.
logger.error(
"3-1", "inconsistency between the client protocol and the protocol of the server",
"", "urls to invokers error",
new IllegalStateException(
"urls to invokers error. invokerUrls.size :" +
invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
// pre-route and build cache
//invoker集合存入routerChain的invokers属性
routerChain.setInvokers(this.getInvokers());
//设置urlInvokerMap为新的urlInvokerMap
this.urlInvokerMap = newUrlInvokerMap;
try {
//销毁无用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
// 通知invoker刷新
this.invokersChanged();
}
}
3.2 toInvokers将URL转换为Invoker
将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。
该方法的大概逻辑为:
- 获取获取消费者需要查询过滤的协议,遍历全部最新服务提供者url,依次进行如下操作:
- 调用checkProtocolValid方法,校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者。服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃。
- 调用mergeUrl方法,合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
- 从原来的缓存中获取该url对应的invoker:
- 如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用。
- 如果缓存没有该url对应的invoker,那么将会重新引用该invoker,并将新引入的invoker加入到新的invoker map缓存中。
- 返回最新的url到invoker的缓存map。
/**
* RegistryDirectory的的方法
*
* 将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。
*
* @param oldUrlInvokerMap 此前的url到invoker的映射
* @param urls 最新服务提供者url集合
* @return invokers 最新的url到invoker的映射
*/
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
//新的映射map
Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
//获取消费者需要查询过滤的协议
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
//遍历最新服务提供者url集合
for (URL providerUrl : urls) {
//校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者
//服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃
if (!checkProtocolValid(queryProtocols, providerUrl)) {
continue;
}
//合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置
//从这里可以知道消费者的配置优先级大于提供者的配置
URL url = mergeUrl(providerUrl);
// Cache key is url that does not merge with consumer side parameters,
// regardless of how the consumer combines parameters,
// if the server url changes, then refer again
//从原来的缓存中获取该url对应的invoker
Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);
//如果缓存没有该url对应的invoker,那么将会重新引用该invoker
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
//如果启用服务
if (enabled) {
//再次通过Protocol$Adaptive的refer方法引用该服务提供者
//在最开始我们就是通过refer方法引用服务的,在再次见到这个方法,只不过这里的url已经变成了某个服务提供者的url了
invoker = protocol.refer(serviceType, url);
}
} catch (Throwable t) {
// Thrown by AbstractProtocol.optimizeSerialization()
if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {
// 4-2 - serialization optimizer class initialization failed.
logger.error("4-2", "typo in optimizer class", "",
"Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
} else {
// 4-3 - Failed to refer invoker by other reason.
logger.error("4-3", "", "",
"Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
}
//加入到新的invoker map缓存中
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(url, invoker);
}
} else {
//如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用
newUrlInvokerMap.put(url, invoker);
}
}
//返回新的invoker map
return newUrlInvokerMap;
}
在上面的步骤中,如果是首次启动消费者,将会统一走Protocol$Adaptive的refer方法引用该服务提供者的逻辑。还记得在最开始讲consumer服务引入的时候吗,那时候我们就是通过这个refer方法引用服务的,现在再次见到这个方法,只不过此前的url则是注册中心协议url,对应着RegistryProtocol,而这里的url已经变成了某个服务提供者的url了,对应着具体的协议实现,例如DubboProtocol、RestProtocol。
我们此前就讲过了Protocol$Adaptive的refer方法实际上返回的是被wrapper包装的Protocol,这里我们直接看最底层的Protocol的refer方法,以默认协议dubbo协议的Protocol实现DubboProtocol为例子!
4 DubboProtocol#refer dubbo协议服务引入
该方法执行基于dubbo序列化协议的服务引入,最终会创建一个DubboInvoker,内部包含一个nettyClient,已经与对应的服务提供者的nettyServer建立了连接,可用于发起rpc远程调用请求。
/**
* DubboProtocol的方法
*
* @param type 服务类型
* @param url 远程服务提供者url
* @return
* @param <T>
* @throws RpcException
*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//销毁检测
checkDestroyed();
//协议绑定引用
return protocolBindingRefer(type, url);
}
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
//销毁检测
checkDestroyed();
//序列化优化
optimizeSerialization(url);
// create rpc invoker.
//创建一个DubboInvoker,可用于发起rpc远程调用
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
//加入协议缓存invokers
invokers.add(invoker);
return invoker;
}
4.1 getClients获取服务客户端
该方法获取服务提供者网络调用客户端。这里会判断是否使用共享连接,因为一个服务提供者根提供了很多的服务接口,这个的是否共享连接,实际上就是指的消费者引入时候,是这些服务接口是否共用一些客户端连接(默认一个),或者说不同的服务接口使用独立的客户端连接(默认一个服务一个连接)。默认是共享连接。
/**
* DubboProtocol的方法
* 获取服务客户端
*
* @param url 服务提供者url
* @return ExchangeClient数组
*/
private ExchangeClient[] getClients(URL url) {
//获取配置的连接数,默认为0
int connections = url.getParameter(CONNECTIONS_KEY, 0);
// whether to share connection
// if not configured, connection is shared, otherwise, one connection for one service
//是否共享连接,如果没有配置connections,那么连接是共享的,否则,一个服务连接一个服务
if (connections == 0) {
/*
* The xml configuration should have a higher priority than properties.
* 共享连接配置,xml配置的优先级应该高于属性
*/
String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
: url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(shareConnectionsStr);
//获取共享客户端
List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);
//设置到ExchangeClient数组中
ExchangeClient[] clients = new ExchangeClient[connections];
Arrays.setAll(clients, shareClients::get);
return clients;
}
//非共享连接,表示当前服务接口使用单独的连接
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
//初始化新的客户端
clients[i] = initClient(url);
}
return clients;
}
4.2 getSharedClient获取共享客户端连接
如果是共享连接配置,那么调用getSharedClient方法获取共享客户端连接,默认连接数为1。该方法的大概步骤为:
- 首先获取服务提供者ip:port 作为共享连接的key,即共享连接情况下,同一个服务提供者实例下的所有服务接口共享某些连接。
- 从缓存referenceClientMap获取key对应的共享客户端连接。
- 如果存在缓存,并且客户端连接全部可用,那么增加连接技术,然后返回即可。否则,只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。
- 如果此前没有该key的客户端连接缓存或者连接不是全部可用,都要走下面的步骤,尝试新创建连接。
- 加synchronized锁,在锁代码中再次双重检测,注意这里还有线程等待唤醒机制。
- 最后判断如果客户端连接为空,那么调用buildReferenceCountExchangeClientList方法构建指定数量的客户端连接。如果连接不为空,那么遍历连接,判断如果该连接不可用,那么新创建一个连接补充进来。
- 最后的处理仍需要加synchronized锁,判断如果最终没建立连接,那么移除无效缓存,否则将最终的客户端连接存入缓存,最后唤醒其他等待的线程。
该方法的核心知识点有两个,一个是buildReferenceCountExchangeClientList方法构建指定数量的客户端连接,另一个就是方法中的synchronized锁以及等待唤醒机制。
为什么需要等待唤醒呢?因为这是共享客户端,那么可能有多个线程都在初始化同一个ip:port的多个客户端,为了避免冲突,需要加锁。
/**
* DubboProtocol的方法
* <p>
* 获取共享客户端连接
*
* @param url 服务提供者url
* @param connectNum 共享连接数量,默认1
*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
//获取 服务提供者ip:port 作为共享连接的key
String key = url.getAddress();
//从缓存获取key对应的共享客户端连接
Object clients = referenceClientMap.get(key);
if (clients instanceof List) {
//转换为ReferenceCountExchangeClient集合,带有引用计数的功能
List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
//检测客户端连接是否全部可用
//只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。
if (checkClientCanUse(typedClients)) {
//如果可用
//增加连接的引用计数,如果我们创建新的调用者共享相同的连接,连接将关闭,没有任何引用
batchClientRefIncr(typedClients);
return typedClients;
}
}
//如果此前没有该key的连接缓存,那么新创建
List<ReferenceCountExchangeClient> typedClients = null;
synchronized (referenceClientMap) {
//死循环
for (; ; ) {
// guarantee just one thread in loading condition. And Other is waiting It had finished.
//双重检测锁
clients = referenceClientMap.get(key);
if (clients instanceof List) {
typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
} else {
//如果共享连接不是全部可用,那么缓存值先设置为为一个object对象,跳出循环
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
}
//如果客户端连接PENDING_OBJECT,那么表示有其他线程正在初始化当前客户端连接,那么当前线程等待直到被通知
else if (clients == PENDING_OBJECT) {
try {
referenceClientMap.wait();
} catch (InterruptedException ignored) {
}
}
//如果没有共享连接,那么缓存值先设置为为一个object对象,跳出循环
else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
}
}
try {
//连接数量必须大于等于1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
//如果客户端连接为空
if (CollectionUtils.isEmpty(typedClients)) {
/*
* 构建客户端连接
*/
typedClients = buildReferenceCountExchangeClientList(url, connectNum);
}
//如果连接不为空
else {
//遍历连接
for (int i = 0; i < typedClients.size(); i++) {
//如果该连接不可用,那么新创建一个连接补充进来
ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
typedClients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
} finally {
synchronized (referenceClientMap) {
//如果最终没建立连接,那么移除无效缓存
if (typedClients == null) {
referenceClientMap.remove(key);
} else {
//将最终的客户端连接存入缓存
referenceClientMap.put(key, typedClients);
}
//唤醒其他线程
referenceClientMap.notifyAll();
}
}
return typedClients;
}
4.3 buildReferenceCountExchangeClientList构建客户端连接
该方法构建指定数量的引用计数交换器客户端,内部循环调用buildReferenceCountExchangeClient方法构建耽单个客户端连接,内部调用initClient方法,初始化交换器客户端,启动一个nettyClient并与服务端建立了连接。
/**
* DubboProtocol的方法
* 构建指定数量的引用计数交换器客户端
*
* @param url 服务提供者url
* @param connectNum 客户端数量
* @return
*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new ArrayList<>();
//循环调用buildReferenceCountExchangeClient方法
for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
/**
* 构建一个引用计数交换器客户端
*
* @param url
* @return
*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
//初始化交换器客户端,启动一个nettyClient并与服务端建立了连接
ExchangeClient exchangeClient = initClient(url);
//创建ReferenceCountExchangeClient
ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
// read configs
//获取服务器关闭等待超时时间,默认10000ms
int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
client.setShutdownWaitTime(shutdownTimeout);
return client;
}
4.4 initClient建立客户端连接
该方法创建客户端连接,大概步骤为:
- 首先获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认netty。
- 用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议。
- 获取lazy参数,判断连接是否懒加载,默认false,即饿加载。如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接,否则立即调用Exchangers.connect(url, requestHandler)方法与服务端建立底层通信客户端连接。
默认情况下,客户端为饿加载,客户端与服务端的连接,在消费者客户端启动引用服务的时候就已经建立了,即服务提供者url转换为invoker的时候,就已经建立了连接。
/**
* DubboProtocol的方法
* 创建一个新的连接
*
* @param url 服务提供者url
*/
private ExchangeClient initClient(URL url) {
/*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
* which means params are shared among different services. Since client is shared among services this is currently not a problem.
*/
//获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// BIO is not allowed since it has severe performance issue.
//不允许使用BIO,因为它有严重的性能问题,目前都是使用netty4
if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
try {
// Replace InstanceAddressURL with ServiceConfigURL.
//用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议
url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
//默认启用心跳
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
//连接是否懒加载,默认false,即饿加载
return url.getParameter(LAZY_CONNECT_KEY, false)
//如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接
? new LazyConnectExchangeClient(url, requestHandler)
//饿加载,与服务端建立底层通信客户端连接
: Exchangers.connect(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
}
4.5 Exchangers#connect建立连接
该方法和我们此前学习的服务提供者的Exchangers#bind方法类型,只不过bind方法创建服务端,该方法创建客户端。
该方法内部基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法。
/**
* Exchangers的方法
*
* 客户端建立与服务端的连接
*
* @param url 服务提供者url
* @param handler 请求处理器
* @return 客户端连接
*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法
return getExchanger(url).connect(url, handler);
}
HeaderExchanger#connect方法中,首先对handler进行包装:DecodeHandler -> HeaderExchangeHandler -> requestHandler。
- DecodeHandler用于负责内部的dubbo协议的请求解码。
- HeaderExchangeHandler用于完成请求响应的映射。
- requestHandler用于nettyHandler真正处理请求。
随后调用Transporters#connect方法启动底层远程网络通信客户端,返回Client。Transporter是Dubbo对网络传输层的抽象接口,Exchanger依赖于Transporter。
最后基于Client构建HeaderExchangeClient返回。
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//包装handler:DecodeHandler -> HeaderExchangeHandler -> handler
//调用Transporters#connect方法启动底层远程网络通信客户端,返回Client
//基于Client构建HeaderExchangeClient返回
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定,目前仅NettyTransporter,基于netty4。
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
//继续包装一层ChannelHandlerDispatcher
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定
return getTransporter(url).connect(url, handler);
}
4.6 NettyTransporter#connect创建NettyClient
该方法很简单,就是根据url和handler创建一个NettyClient实例,在NettyClient的构造器中,会调用doOpen()开启客户端,创建Bootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,随后调用connect方法连接服务提供者所在服务端。
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
//基于url和handler创建NettyClient
return new NettyClient(url, handler);
}
NettyClient的构造器如下,将会调用父类构造器启动客户端。
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型
//继续包装handler: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
AbstractClient的构造器如下,将会获取绑定的ip和端口以及其他参数,然后调用doOpen方法真正的开启netty客户端,最后调用connect方法连接服务提供者所在服务端。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// set default needReconnect true when channel is not connected
//当通道未连接时设置默认needReconnect为true
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
//初始化执行器,消费者的执行程序是全局共享的,提供者ip不需要是线程名的一部分。
initExecutor(url);
try {
/*
* 创建netty客户端
*/
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
/*
* 连接服务提供者所在服务端
*/
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
// If lazy connect client fails to establish a connection, the client instance will still be created,
// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
" connect to the server " + getRemoteAddress() +
" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
t.getMessage(), t);
return;
}
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
}
4.7 doOpen初始化NettyClient
该方法用于初始化并启动netty客户端,是非常标准的netty客户端启动代码,如果你们使用过Netty,看过Netty源码,一定就会感到非常熟悉。
创建Bootstrap,设置eventGroup,编配ChannelHandler。至此成功初始化了Bootstrap,但是并没有连接服务端。
/**
* NettyClient的方法
*
* 初始化 bootstrap
*/
@Override
protected void doOpen() throws Throwable {
//创建NettyClientHandler
final NettyClientHandler nettyClientHandler = createNettyClientHandler();
//创建Bootstrap,说明这是一个netty客户端
bootstrap = new Bootstrap();
//初始化NettyClient
initBootstrap(nettyClientHandler);
}
protected NettyClientHandler createNettyClientHandler() {
//创建NettyClientHandler,当前NettyClient对象本身也是一个ChannelHandler实例,其received方法委托给创建实例时传递的内部的handler处理
return new NettyClientHandler(getUrl(), this);
}
protected void initBootstrap(NettyClientHandler nettyClientHandler) {
//配置线程组
bootstrap.group(EVENT_LOOP_GROUP.get())
//设置Socket 参数
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
//IO模型
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
//设置处理器
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl()));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
//解码
.addLast("decoder", adapter.getDecoder())
//编码
.addLast("encoder", adapter.getEncoder())
//心跳检测
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
//最后是此前创建的nettyClientHandler
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);
if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {
int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
4.8 connect连接服务端
在初始化Bootstrap之后,将调用connect方法真正的连接服务提供者所在的服务端,内部调用doConnect方法执行连接,该方法由子类实现。
/**
* AbstractClient的方法
* <p>
* 连接服务提供者所在服务端
*/
protected void connect() throws RemotingException {
//加锁
connectLock.lock();
try {
//如果已连接则返回
if (isConnected()) {
return;
}
//如果已关闭则返回
if (isClosed() || isClosing()) {
logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");
return;
}
/*
* 执行连接
*/
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
NettyClient的doConnect方法如下,主要逻辑就是调用bootstrap.connect方法连接服务端:
/**
* NettyClient的方法
* 连接服务端
*/
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
//通过bootstrap连接服务端
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
//等待连接超时事件
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
//如果连接成功
if (ret && future.isSuccess()) {
//获取通道
Channel newChannel = future.channel();
try {
// Close old channel
// copy reference
//关闭旧的Channel
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
Throwable cause = future.cause();
// 6-1 Failed to connect to provider server by other reason.
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);
logger.error("6-1", "network disconnected", "",
"Failed to connect to provider server by other reason.", cause);
throw remotingException;
} else {
// 6-2 Client-side timeout
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
logger.error("6-2", "provider crash", "",
"Client-side timeout.", remotingException);
throw remotingException;
}
} finally {
// just add new valid channel to NettyChannel's cache
if (!isConnected()) {
//future.cancel(true);
}
}
}
5 saveProperties更新本地文件信息
在每次通知内存数据更新之后,更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
/**
* AbstractRegistry的方法
*
* @param url 服务消费者url
*/
private void saveProperties(URL url) {
//服务缓存文件路径为 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache
if (file == null) {
return;
}
try {
//需要存储的url字符串
StringBuilder buf = new StringBuilder();
//获取该url的不同类别节点到对应url列表的map
Map<String, List<URL>> categoryNotified = notified.get(url);
//遍历所有的节点url
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
//追加空格
buf.append(URL_SEPARATOR);
}
//追加url字符串
buf.append(u.toFullString());
}
}
}
//消费者url key以及对应的节点url字符串存入properties
properties.setProperty(url.getServiceKey(), buf.toString());
//版本自增
long version = lastCacheChanged.incrementAndGet();
//保存properties到本地文件
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.schedule(() -> doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
本地缓存文件路径为:{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache,里面缓存的内容如下,每一个服务接口占据一行,它的所有url字符串都追加在后面,通过空格分隔。
6 总结
本次我们学习了接口级别服务发现订阅refreshInterfaceInvoker方法的具体实现,大概步骤为:
- 第一次调用refreshInterfaceInvoker方法的时候,由于MigrationInvoker内部的真实消费者Invoker为null,那么需要创建一个消费者Invoker。
- 首先创建动态注册心中目录DynamicDirectory,随后调用doCreateInvoker方法创建服务消费者Invoker。
- 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
- 调用registry.register方法将消费者注册信息url注册到注册中心。
- 调用directory.buildRouterChain方法构建服务调用路由链RouterChain,赋给directory的routerChain属性。
- 调用directory.subscribe方法进行服务发现、引入并订阅服务。
- directory本身是一个监听器,directory将会订阅zookeeper对应的服务接口节点下的dubbo/[service name]/providers,服务提供者目录,以及dubbo/[service name]/configurators,即配置目录,以及dubbo/[service name]/routers,即服务路由目录。
- 依靠着zookeeper的watch监听回调机制,当这些节点下的子节点发生变化时会触发回调通知RegistryDirectory执行notify方法,进而完成本地服务列表的动态更新功能。实际上服务提供者也会订阅,只不过只会订阅configurators节点。
- 在执行订阅的时候,将会进行一次providers,configurators,routers节点目录下字节点的获取,这样就获取到了当前的服务提供者url、配置信息url、服务路由url。
- 在subscribe方法的最后,也是最关键的一步,主动调用notify方法通知数据变更。这里实际上会动态更新本地内存和文件中的服务提供者缓存,可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存,这里面存放着服务提供者url到对应的Invoker的映射。
- 如果没有在本地缓存中找到某个服务提供者url的缓存,那么会将url转换为对应协议的Invoker,默认DubboInvoker,DubboInvoker的内部还会创建NettyClient客户端,并与服务提供者所在的服务端建立连接。
- 将url转换为Invoker之前,将会进行配置的合并,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
- 调用cluster.join方法传入directory进行集群容错能力包装,最终返回一个ClusterInvoker作为消费者Invoker,即MockClusterInvoker,这是一个包装类,内部包含真正的集群容错Invoker,默认是FailoverClusterInvoker。
到此我们可以知道上面的各种对象的关系(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来):
到此接口级服务引入学习完毕,实际上Dubbo2就是采用的接口级别服务注册和引入。后面我们将继续学习应用级服务引入,实际上这才是Dubbo3升级的一个重点,非常值得学习!