Eureka学习笔记-客户端
Eureka学习笔记
客户端
模块设计
Applications
:保存了从 Eureka Server 获取到的服务信息,相当于 Eureka Client 端的服务列表缓存。InstanceInfo
:维护了自身服务实例的信息,注册和心跳时需要用到。QueryClient
:负责从 Eureka Server 获取已注册的服务,并且更新Applications
。RegistrationClient
:负责在服务启动时发送注册请求,然后定期发送心跳,最后在服务下线之前取消注册。ClusterResolver
:QueryClient
和RegistrationClient
在发送请求前需要先知道 Eureka Server 的地址,ClusterResolver
可以根据不同的策略和实现返回 Eureka Server 地址列表以供选择。JerseyApplicationClient
:是真正发送网络请求的 Http client,QueryClient
和RegistrationClient
获取到 Eureka Server 地址后会创建一个JerseyApplicationClient
和该 Eureka Server 通讯。
EurekaHttpClient
EurekaHttpClient的设计使用了装饰器模式,类图结构如下
层次关系如下图
JerseyApplicationClient
: 使用 Jersey 实现的基础 HTTP 客户端,负责与 Eureka 服务端进行核心通信操作。MetricsCollectingEurekaHttpClient
: 统计和收集 HTTP 请求的性能指标(如延迟、成功率),用于监控和分析。RedirectingEurekaHttpClient
: 处理 Eureka 服务端返回的重定向响应,并将请求转发到正确的目标地址。RetryableEurekaHttpClient
: 为 HTTP 请求增加重试机制,确保在网络波动或短暂故障情况下提高请求的可靠性。SessionedEurekaHttpClient
: 维护与 Eureka 服务端的会话状态,优化频繁请求中的连接管理和资源利用。
RetryableEurekaHttpClient
见名知意,这个client的作用是使得请求具有自动重试的功能
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
// 判断请求是否成功
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
ServerStatusEvaluators#LEGACY_EVALUATOR
private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
public boolean accept(int statusCode, EurekaHttpClientDecorator.RequestType requestType) {
if ((statusCode < 200 || statusCode >= 300) && statusCode != 302) {
if (requestType == RequestType.Register && statusCode == 404) {
return true;
} else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
return true;
} else if (requestType == RequestType.Cancel) {
return true;
} else {
return requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404);
}
} else {
return true;
}
}
};
这里根据请求类型以及响应码来判断是否**“成功”**,但这里的在非2xx返回的成功实际上意味着真正的成功,只表明无需进行重试,在调用上游还是会根据响应码判断操作本身是否成功,如
// 注册判断是否成功
boolean register() throws Throwable {
...
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// 心跳判断是否成功
boolean renew() {
// 返回404重新发起注册
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
long timestamp = instanceInfo.setIsDirtyWithTime();
}
...
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
// 下线实际不太关心结果
void unregister() {
...
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(),
}
发生失败时将会记录这次失败的Service到隔离集合,而重试的需要请求的Service将会重新选择,选择的过程将会优先排除掉之前失败的Servie,但在隔离结合的数量超过阈值后将会清空隔离集合
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
...
// 操作阈值清空隔离集合
else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
// 排除隔离集合中的Service
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
SessionedEurekaHttpClient
它的主要作用是实现一定程度的EurekaService负载均衡,在一定时间间隔后强制重新获取一个新的client
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
// 超过时间关闭当前client
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
// 重新构建
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
Applications的维护
应用/应用实例的上实例的变更并不是一个频繁的操作,所以应用列表会在DiscoverClient本地缓存,DiscoverClient会定时刷新(默认30s)本地缓存
而刷新通常都是增量更新,即从Service获取最近的增量数据,然后根据增量类型(注册、下线、修改)来更新本地缓存。由于增量数据本身是会过期的(默认3分钟),如果client这这个过程中出现网络问题,就有可能丢失部分实例数据。
DiscoverClient#fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
...
Applications applications = getApplications();
// 触发全量更新的条件,第一次/强制更新/本地没有数据等
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
getAndStoreFullRegistry();
} else {
// 触发增量更新
getAndUpdateDelta(applications);
}
...
}
DiscoverClient#getAndUpdateDelta()
这里如果没有获取到增量数据还是进行全量更新,否则会进行一个原子更新,防止多线程并发更新
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
...
if (delta == null) {
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
updateDelta(delta);
...
} else {
logger.warn("Not updating application delta as another thread is updating it already");
}
}
DiscoverClient#updateDelta()
这里会更新不同的增量类型更新本地缓存
- 对于注册:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
- 对于修改:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
- 对于下线:首先移除实例,如果应用的实例列表为空则移除应用
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
++deltaCount;
// 注册
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
// 修改
else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
// 删除
else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
// 设置application版本、随机化应用实例
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
...
}
ClusterResolver
ClusterResolver和EurekaHttpClient具有层层委托的特性,主要有如下最基础用到的有如下几个Resolver
- ConfigClusterResolver :从配置文件中解析树说所有的EurekaService地址
- ZoneAffinityClusterResolver:根据client所在zone对ServiceUrl分组排序,将处于相同Zone的ServiceUrl放在list的前面,并且做随机化处理,保证client尽可能的均匀的选择EurekaService。
- AsyncResolver :动态拉取服务实例列表并在本地缓存,以提供更高效、非阻塞的服务解析能力。
ZoneAffinityClusterResolver
ZoneAffinityClusterResolver#getClusterEndpoints()
public List<AwsEndpoint> getClusterEndpoints() {
// 根据client的所在zone将所有Endpoint划为相同区和保留区
List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
List<AwsEndpoint> myZoneEndpoints = parts[0];
List<AwsEndpoint> remainingEndpoints = parts[1];
// 对相同区和保留区做随机化处理,并且把相同区放在list的前面
List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
if (!zoneAffinity) {
Collections.reverse(randomizedList);
}
logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);
return randomizedList;
}
...
private List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
...
List<AwsEndpoint> mergedList = randomizer.randomize(myZoneEndpoints);
mergedList.addAll(randomizer.randomize(remainingEndpoints));
return mergedList;
}
AsyncResolver
AsyncResolver#getClusterEndpoints()
public List<T> getClusterEndpoints() {
long delay = refreshIntervalMs;
// 第一次获取做一个预热
if (warmedUp.compareAndSet(false, true)) {
if (!doWarmUp()) {
delay = 0;
}
}
// 定时刷新
if (scheduled.compareAndSet(false, true)) {
scheduleTask(delay);
}
return resultsRef.get();
}
private final Runnable updateTask = new Runnable() {
@Override
public void run() {
try {
...
// 从委托者中获取EndPoints
List<T> newList = delegate.getClusterEndpoints();
if (newList != null) {
resultsRef.getAndSet(newList);
}
}
};
ps:这里的数据源头是静态的配置,AsyncResolver没有什么太实际的意义,但如果底层使用如:DNSClusterResolver这种动态的数据源就能发挥其作用了