当前位置: 首页 > article >正文

SpringCloud源码:客户端分析(二)- 客户端源码分析

6f3ceaf83f7d086cdaa12f3d37927247.jpeg


背景

我们继续分析EurekaClient的两个自动化配置类:

自动化配置类
功能职责
EurekaClientAutoConfiguration配置EurekaClient确保了Eureka客户端能够正确地:
- 注册到Eureka服务端
- 周期性地发送心跳信息来更新服务租约
- 下线时通知Eureka服务端
- 获取服务实例列表;

更侧重于Eureka客户端的基本配置和功能实现
EurekaDiscoveryClientConfiguration配置EurekaDiscoveryClient创建RefreshScopeRefreshedEvent事件的监听类,用于重启注册;
更多地涉及到服务的自动注册、健康检查以及事件处理等方面

CloudEurekaClient分析

原理

客户端本质就是4个动作:

  1. 获取服务列表

  2. 注册服务实例

  3. 租约续约

  4. 取消租约

源码

让我们继续关注 第一个自动装配类 EurekaClientAutoConfiguration 对CloudEurekaClient 的构造封装,即如下代码块:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
      search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
      EurekaClientConfig config) {
   return new CloudEurekaClient(manager, config, this.optionalArgs,
         this.context);
}

分析代码:

  • CloudEurekaClient对象,并交给容器管理bean

CloudEurekaClient    

public class CloudEurekaClient extends DiscoveryClient {
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
          EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
          ApplicationEventPublisher publisher) {
       super(applicationInfoManager, config, args);
       this.applicationInfoManager = applicationInfoManager;
       this.publisher = publisher;
       this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
             "eurekaTransport");
       ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }
}

分析代码:

  • 实际上CloudEurekaClient调用了父类DiscoveryClient的构造器

DiscoveryClient

经历了多个重载构造器的嵌套,我们进入了最终的构造器:

private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;


@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // .... 一些初始化工作


    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());


    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());


        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff


        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff


        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);


        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }


    // .......


    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }


    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();


    // ...其他初始化工作
}

代码分析:

  • 这里初始化了3个异步线程池:scheduler、heartbeatExecutor、cacheRefreshExecutor

    • scheduler:coreSize=2的周期任务线程池,线程名命名是DiscoveryClient-%s

    • heartbeatExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-HeartbeatExecutor-%d

    • cacheRefreshExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-CacheRefreshExecutor-%d

  • 这三个线程池,是怎么配合工作的呢?不着急,慢慢往下看

initScheduledTasks()的代码如下:
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        // 【1】
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }


    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);


        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );


        // 【2】
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);


    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

分析代码:

  • 【1】检查是否需要获取注册表信息(配置项eureka.client.fetchRegistry默认=true)

    • 用注入的异步线程池cacheRefreshExecutor,按指定时间间隔registryFetchIntervalSeconds,去执行CacheRefreshThread,即缓存刷新refreshRegistry()任务

    • 缓存刷新任务cacheRefreshTask

    • 使用调度器 scheduler 安排任务

  • 【2】检查是否需要注册入Eureka(配置项eureka.client.registerWithEureka默认=true)

    • 用注入的异步线程池heartbeatExecutor,按指定时间间隔renewalIntervalInSecs,去执行HeartbeatThread,即执行续租renew()任务

    • 心跳续租任务heartbeatTask

    • 使用调度器 scheduler 安排任务

CacheRefreshThread - 缓存刷新
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}


@VisibleForTesting
void refreshRegistry() {
    try {
        //.....


        //【1】刷新本地注册服务列表
        boolean success = fetchRegistry(remoteRegionsModified);
        //.....
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}


private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    try {
        // 【2】获取本地localRegionApps的服务列表
        Applications applications = getApplications();


        // 【3】获取远程数据并更新服务列表
        getAndUpdateDelta(applications);
    }


    // registry was fetched successfully, so return true
    return true;
}


private void getAndUpdateDelta(Applications applications) throws Throwable {
    // .....


    //【4】检查缓存delta的服务注册列表
    Applications delta = null;
    if (delta == null) {
        // 【4.1】如果缓存为空,就再去拉取一次EurekaServer的数据
        getAndStoreFullRegistry();
    } else {
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                //【5】获取EurekaServer最新的服务注册表,并执行delta更新
                getAndStoreFullRegistry()
                updateDelta(delta);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        }    
    }
} 


private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
}


private void updateDelta(Applications delta) {
    int deltaCount = 0;
    //【6】遍历服务注册列表的每个app
    for (Application app : delta.getRegisteredApplications()) {
        //【7】遍历每个服务的所有实例instance
        for (InstanceInfo instance : app.getInstances()) {
            //【8】获取本地cache的服务注册信息
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }


            ++deltaCount;
            //【9】如果实例是新增的类型
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    //【10】执行实例添加
                    applications.addApplication(app);
                }
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
            //【11】如果实例是修改的类型
            else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    //【12】没有已有实例,执行添加操作
                    applications.addApplication(app);
                }
                //【13】存在已有实例,则注册新的实例信息
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
            //【14】如果实例是删除的类型
            else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    //【15】删除这个服务的实例
                    existingApp.removeInstance(instance);
                    //【16】如果这个服务的实例数量=0,则直接删除服务信息app
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
}

代码分析:见下面流程图

34a3a1ce9a8b23a83da835f99ab9195e.png

HeartbeatThread - 心跳续租
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX
        + "Reregister");


private class HeartbeatThread implements Runnable {


    public void run() {
        //【1】更新操作
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}


boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        //【2】客户端发送心跳包,获取响应
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        //【3】响应码=404,说明服务在EurekaServer不存在
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            long timestamp = instanceInfo.setIsDirtyWithTime();
            //【4】客户端重新发起一次register操作,给EurekaServer
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
             //【5】EurekaServer注册成功,则续约成功
            return success;
        }
        //【6】响应码=200,则在EurekaServer侧续约成功了
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

代码分析:见下面流程图

345ce2121267fe7585d492bf0921565c.png

取消租约
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
      search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
      EurekaClientConfig config) {
   return new CloudEurekaClient(manager, config, this.optionalArgs,
         this.context);
}


@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");


        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }


        cancelScheduledTasks();


        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }


        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }


        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();


        Monitors.unregisterObject(this);


        logger.info("Completed shut down of DiscoveryClient");
    }
}


private void cancelScheduledTasks() {
    if (instanceInfoReplicator != null) {
        instanceInfoReplicator.stop();
    }
    if (heartbeatExecutor != null) {
        heartbeatExecutor.shutdownNow();
    }
    if (cacheRefreshExecutor != null) {
        cacheRefreshExecutor.shutdownNow();
    }
    if (scheduler != null) {
        scheduler.shutdownNow();
    }
    if (cacheRefreshTask != null) {
        cacheRefreshTask.cancel();
    }
    if (heartbeatTask != null) {
        heartbeatTask.cancel();
    }
}

代码分析:见下面流程图

2f689c9147234a3979b9b72c3722c439.png

小结

我们回到开头的原理,知道EurekaClient客户端本质就是4个动作:

  1. 获取服务列表:在CacheRefreshThread里有实现,即CacheRefreshThread的【4.1】步骤的eurekaTransport.queryClient.getApplications

  2. 注册服务实例:在HeartbeatThread里有实现,即HeartbeatThread的【4】步骤的eurekaTransport.registrationClient.register

  3. 租约续约:在HeartbeatThread里有实现,即HeartbeatThread的【2】步骤的eurekaTransport.registrationClient.sendHeartBeat

  4. 取消租约:在定义CloudEurekaClient的@Bean(destroyMethod = "shutdown")注解有生命

但我们还想知道,CacheRefreshThread 和 HeartbeatThread的背后通信,以及在EurekaServer的原理细节。可以,我们放到下一个章节再讲。

其他文章

Kafka消息堆积问题排查

基于SpringMVC的API灰度方案

理解到位:灾备和只读数据库

SQL治理经验谈:索引覆盖

Mybatis链路分析:JDK动态代理和责任链模式的应用

大模型安装部署、测试、接入SpringCloud应用体系

Mybatis插件-租户ID的注入&拦截应用


http://www.kler.cn/news/321718.html

相关文章:

  • ArduSub程序学习(11)--EKF实现逻辑①
  • [AI问答] Auto-sklearn和Auto-Keras对比
  • Ubuntu20.04.6 环境下docker设置proxy
  • SpringBoot-Starter2.7.3自动装配Redisson升级版本运行时的问题
  • 自动驾驶技术:人工智能驾驶的未来
  • tauri程序加载本地图片或者文件在前端页面展示
  • ModStartCMS v8.9.0 图片上传优化,富文本编辑器修复
  • Spring Boot 实战:使用观察者模式实现实时库存管理
  • localectl 命令:系统语言、键盘布局和区域设置
  • CORE 中间件、wwwroot
  • C++11中引入的thread
  • 正向科技|格雷母线定位系统的设备接线安装示范
  • 脚手架是什么?详细版+通俗易懂版!!!!!!
  • DNS与host文件
  • 职业技能大赛-自动化测试笔记(PageObject)分享-4
  • 如何将自定义支付网关与 WooCommerce Checkout 区块集成
  • HarmonyOS---权限和http/Axios网络请求
  • 处理 VA02修改行项目计划行(SCHEDULE LINES )报错:不可能确定一个消耗帐户
  • count(1)、count(*) 与 count(列名) 的区别
  • zabbix“专家坐诊”第257期问答
  • 19、网络安全合规复盘
  • C++ | Leetcode C++题解之第440题字典序的第K小数字
  • 【HDP】zookeeper未授权漏洞修复
  • C语言课程设计题目四:实验设备管理系统设计
  • Flutter鸿蒙化环境配置(windows)
  • 网站设计中安全方面都需要有哪些考虑
  • 【opencv】——为arm平台交叉编译
  • Apache Iceberg 数据类型参考表
  • URL中 / 作为字符串,而不是路径。
  • 19.1 使用k8s的sdk编写一个项目获取pod和node信息