当前位置: 首页 > article >正文

Eureka学习笔记-客户端

Eureka学习笔记

客户端

模块设计

在这里插入图片描述

  • Applications :保存了从 Eureka Server 获取到的服务信息,相当于 Eureka Client 端的服务列表缓存。
  • InstanceInfo :维护了自身服务实例的信息,注册和心跳时需要用到。
  • QueryClient :负责从 Eureka Server 获取已注册的服务,并且更新Applications
  • RegistrationClient :负责在服务启动时发送注册请求,然后定期发送心跳,最后在服务下线之前取消注册。
  • ClusterResolverQueryClientRegistrationClient 在发送请求前需要先知道 Eureka Server 的地址,ClusterResolver 可以根据不同的策略和实现返回 Eureka Server 地址列表以供选择。
  • JerseyApplicationClient :是真正发送网络请求的 Http client,QueryClientRegistrationClient 获取到 Eureka Server 地址后会创建一个 JerseyApplicationClient 和该 Eureka Server 通讯。
EurekaHttpClient

EurekaHttpClient的设计使用了装饰器模式,类图结构如下

在这里插入图片描述

层次关系如下图
在这里插入图片描述

  • JerseyApplicationClient: 使用 Jersey 实现的基础 HTTP 客户端,负责与 Eureka 服务端进行核心通信操作。
  • MetricsCollectingEurekaHttpClient: 统计和收集 HTTP 请求的性能指标(如延迟、成功率),用于监控和分析。
  • RedirectingEurekaHttpClient: 处理 Eureka 服务端返回的重定向响应,并将请求转发到正确的目标地址。
  • RetryableEurekaHttpClient: 为 HTTP 请求增加重试机制,确保在网络波动或短暂故障情况下提高请求的可靠性。
  • SessionedEurekaHttpClient: 维护与 Eureka 服务端的会话状态,优化频繁请求中的连接管理和资源利用。
RetryableEurekaHttpClient

见名知意,这个client的作用是使得请求具有自动重试的功能

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    int endpointIdx = 0;
    for (int retry = 0; retry < numberOfRetries; retry++) {
        EurekaHttpClient currentHttpClient = delegate.get();
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates();
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }

            currentEndpoint = candidateHosts.get(endpointIdx++);
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // 判断请求是否成功
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // Connection error or 5xx from the server that must be retried on another server
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    throw new TransportException("Retry limit reached; giving up on completing the request");
}

ServerStatusEvaluators#LEGACY_EVALUATOR

private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
    public boolean accept(int statusCode, EurekaHttpClientDecorator.RequestType requestType) {
        if ((statusCode < 200 || statusCode >= 300) && statusCode != 302) {
            if (requestType == RequestType.Register && statusCode == 404) {
                return true;
            } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
                return true;
            } else if (requestType == RequestType.Cancel) {
                return true;
            } else {
                return requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404);
            }
        } else {
            return true;
        }
    }
};

这里根据请求类型以及响应码来判断是否**“成功”**,但这里的在非2xx返回的成功实际上意味着真正的成功,只表明无需进行重试,在调用上游还是会根据响应码判断操作本身是否成功,如

// 注册判断是否成功
boolean register() throws Throwable {
		...
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// 心跳判断是否成功
boolean renew() {
	 // 返回404重新发起注册
     if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
         long timestamp = instanceInfo.setIsDirtyWithTime();
     }
    ...
	return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
// 下线实际不太关心结果
void unregister() {
    ...
		EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), 
}

发生失败时将会记录这次失败的Service到隔离集合,而重试的需要请求的Service将会重新选择,选择的过程将会优先排除掉之前失败的Servie,但在隔离结合的数量超过阈值后将会清空隔离集合

private List<EurekaEndpoint> getHostCandidates() {
    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
    quarantineSet.retainAll(candidateHosts);
		...
    // 操作阈值清空隔离集合
  	else if (quarantineSet.size() >= threshold) {
        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
        quarantineSet.clear();
    } else {
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            // 排除隔离集合中的Service
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }

    return candidateHosts;
}
SessionedEurekaHttpClient

它的主要作用是实现一定程度的EurekaService负载均衡,在一定时间间隔后强制重新获取一个新的client

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    long now = System.currentTimeMillis();
    long delay = now - lastReconnectTimeStamp;
		// 超过时间关闭当前client
    if (delay >= currentSessionDurationMs) {
        logger.debug("Ending a session and starting anew");
        lastReconnectTimeStamp = now;
        currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
        TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
    }
		// 重新构建
    EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
    if (eurekaHttpClient == null) {
        eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
    }
    return requestExecutor.execute(eurekaHttpClient);
}
Applications的维护

应用/应用实例的上实例的变更并不是一个频繁的操作,所以应用列表会在DiscoverClient本地缓存,DiscoverClient会定时刷新(默认30s)本地缓存
而刷新通常都是增量更新,即从Service获取最近的增量数据,然后根据增量类型(注册、下线、修改)来更新本地缓存。由于增量数据本身是会过期的(默认3分钟),如果client这这个过程中出现网络问题,就有可能丢失部分实例数据。

DiscoverClient#fetchRegistry()

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
				...
        Applications applications = getApplications();
				// 触发全量更新的条件,第一次/强制更新/本地没有数据等
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
						// 触发增量更新
            getAndUpdateDelta(applications);
        }
   			...
}

DiscoverClient#getAndUpdateDelta()

这里如果没有获取到增量数据还是进行全量更新,否则会进行一个原子更新,防止多线程并发更新

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  		...
    if (delta == null) {
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    		 updateDelta(delta);
      	 ...
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
    }
}

DiscoverClient#updateDelta()

这里会更新不同的增量类型更新本地缓存

  • 对于注册:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
  • 对于修改:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
  • 对于下线:首先移除实例,如果应用的实例列表为空则移除应用
private void updateDelta(Applications delta) {
    int deltaCount = 0;
    for (Application app : delta.getRegisteredApplications()) {
        for (InstanceInfo instance : app.getInstances()) {
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            ++deltaCount;
						// 注册
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
          	// 修改
          	else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            }
          	// 删除
          	else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    existingApp.removeInstance(instance);
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
		// 设置application版本、随机化应用实例
    getApplications().setVersion(delta.getVersion());
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
  	...
}
ClusterResolver

ClusterResolver和EurekaHttpClient具有层层委托的特性,主要有如下最基础用到的有如下几个Resolver

在这里插入图片描述

  • ConfigClusterResolver :从配置文件中解析树说所有的EurekaService地址
  • ZoneAffinityClusterResolver:根据client所在zone对ServiceUrl分组排序,将处于相同Zone的ServiceUrl放在list的前面,并且做随机化处理,保证client尽可能的均匀的选择EurekaService。
  • AsyncResolver :动态拉取服务实例列表并在本地缓存,以提供更高效、非阻塞的服务解析能力。
ZoneAffinityClusterResolver

ZoneAffinityClusterResolver#getClusterEndpoints()

public List<AwsEndpoint> getClusterEndpoints() {
    // 根据client的所在zone将所有Endpoint划为相同区和保留区
    List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
    List<AwsEndpoint> myZoneEndpoints = parts[0];
    List<AwsEndpoint> remainingEndpoints = parts[1];
	// 对相同区和保留区做随机化处理,并且把相同区放在list的前面
    List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
    if (!zoneAffinity) {
        Collections.reverse(randomizedList);
    }

    logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);

    return randomizedList;
}
...
private List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
    ...
    List<AwsEndpoint> mergedList = randomizer.randomize(myZoneEndpoints);
    mergedList.addAll(randomizer.randomize(remainingEndpoints));
    return mergedList;
}
AsyncResolver

AsyncResolver#getClusterEndpoints()

public List<T> getClusterEndpoints() {
    long delay = refreshIntervalMs;
    // 第一次获取做一个预热
    if (warmedUp.compareAndSet(false, true)) {
        if (!doWarmUp()) {
            delay = 0;
        }
    }
    // 定时刷新
    if (scheduled.compareAndSet(false, true)) {
        scheduleTask(delay);
    }
    return resultsRef.get();
}
private final Runnable updateTask = new Runnable() {
    @Override
    public void run() {
    	try {
            ...
            // 从委托者中获取EndPoints
            List<T> newList = delegate.getClusterEndpoints();
            if (newList != null) {
               	resultsRef.getAndSet(newList);
             }
        }
};

ps:这里的数据源头是静态的配置,AsyncResolver没有什么太实际的意义,但如果底层使用如:DNSClusterResolver这种动态的数据源就能发挥其作用了


http://www.kler.cn/a/450505.html

相关文章:

  • leetcode-80.删除有序数组的重复项II-day12
  • Cherno C++学习笔记 P46 箭头运算符
  • 工控触摸屏用winForms来构建框架,效果还是很不错的
  • tcp 的重传,流量控制,拥塞控制
  • 公交车信息管理系统:实现交通数据的智能化处理
  • 深入解析 Spring Bean 配置与装配:从基础到进阶的实用指南
  • 5G CPE 主控CPU(上位机) 数据转发性能评估--基于5G模组和NEO3开发板评估
  • Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
  • Java 网络编程 ②-TCP Socket
  • Electron -- Electron应用主要核心(二)
  • ABAP开发-权限控制
  • Android 蓝牙开发-传输数据
  • AIDD - 探索语言模型在药物分子生成方面的应用
  • iptables交叉编译(Hisiav300平台)
  • [cisco 模拟器] ftp服务器配置
  • 一个简单封装的的nodejs缓存对象
  • 828考研资料汇总
  • C++ QT chip layout tool开发浅思
  • 【python】银行客户流失预测预处理部分,独热编码·标签编码·数据离散化处理·数据筛选·数据分割
  • PTA数据结构题目:链表操作集合
  • 近实时”(NRT)搜索、倒排索引
  • Unity3d 基于UGUI和VideoPlayer 实现一个多功能视频播放器功能(含源码)
  • GitLab 停止为中国区用户提供 GitLab.com 账号服务
  • kong网关使用pre-function插件,改写接口的返回数据
  • 隧道可视化技术开拓智能建设新航道
  • 基于Spring Boot的摄影器材租赁回收系统