当前位置: 首页 > article >正文

深度了解flink(九) JobManager(3) HA分析

HA核心类、接口

HighAvailabilityServices

HighAvailabilityServices是HA Service的核心接口,具体功能如下:

1.定义了高可用组件(Dispatcher、ResourceManager等)的leader选举接口和leader获取接口

2.检查点元数据的持久:将检查点的元数据存储到持久化存储中,以便在系统重启或故障恢复时能够恢复状态。

3.注册最新的完成检查点:记录最新的完成检查点,以便在需要时能够快速恢复到该检查点的状态。

4.BLOB 存储的持久化:将大对象(BLOB)数据存储到持久化存储中,确保数据不会因系统重启而丢失。

5.标记作业状态的注册表:维护一个注册表,记录每个作业的状态(如运行中、已完成、失败等),以便进行状态管理和监控。

6.RPC 端点的命名:为远程过程调用(RPC)端点分配唯一的名称,以便在分布式系统中进行通信和调用。

HighAvailabilityServices的UML类图如下:

有两个实现类:

AbstractHaServices

不具有高可用服务的haService

AbstractNonHaServices

提供具有高可用的haService

HA Service初始化流程
ClusterEntry#initializeServices

ClusterEntry#initializeServices进行了集群启动前服务初始化工作,其中也初始化了高可用服务

haServices = createHaServices(configuration, ioExecutor, rpcSystem);
HighAvailabilityServicesUtils.createHighAvailabilityServices

调用createHaServices->会调用HighAvailabilityServicesUtils.createHighAvailabilityServices工具类的方法进行高可用服务的创建

public static HighAvailabilityServices createHighAvailabilityServices(
            Configuration configuration,
            Executor executor,
            AddressResolution addressResolution,
            RpcSystemUtils rpcSystemUtils,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {
        //获取ha模式,默认是NONE,可选项有ZOOKEEPER和KUBERNETES或者自定义,如果为NONE不开启高可用
        HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
        //根据高可用mode返回对应的haService的实现类
        switch (highAvailabilityMode) {
            case NONE:
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
                //省略

                return new StandaloneHaServices(
                        resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
            case ZOOKEEPER:
                return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
            case KUBERNETES:
                return createCustomHAServices(
                        "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
                        configuration,
                        executor);

            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);

            default:
                throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
        }
    }

步骤:

1.通过配置获取高可用的模式

2.根据模式创建对应的haService的实现类

Leader获取

LeaderRetrievalListener
public interface LeaderRetrievalListener {
    void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
    void handleError(Exception exception);
}

LeaderRetrievalListener接口主要服务于当Leader地址修改了,会触发接口中的notifyLeaderAddress(LeaderRetrievalService内部持有该接口的对象实例),方法功能,

1.notifyLeaderAddress Leader地址选举后(第一次,或者Leander变更)会调用该方法

2.handleError 出现异常时会调用该方法

LeaderRetrievalService
public interface LeaderRetrievalService {

    /**
     * Starts the leader retrieval service with the given listener to listen for new leaders. This
     * method can only be called once.
     *
     * @param listener The leader retrieval listener which will be notified about new leaders.
     * @throws Exception
     */
    void start(LeaderRetrievalListener listener) throws Exception;

    /**
     * Stops the leader retrieval service.
     *
     * @throws Exception
     */
    void stop() throws Exception;
}

LeaderRetrievalService是Flink中一个重要的接口,它主要用于在leader变更时收到通知,并回调注册的LeaderRetrievalListener,方法,

1.start:开始监听Leader,该方法之启动一次

2.stop:停止监听

Leader 选举

LeaderElectionService
public interface LeaderElectionService {

    /**
     * Creates a new {@link LeaderElection} instance that is registered to this {@code
     * LeaderElectionService} instance.
     *
     * @param componentId a unique identifier that refers to the stored leader information that the
     *     newly created {@link LeaderElection} manages.
     */
    LeaderElection createLeaderElection(String componentId);
}

1.选举服务接口,它允许在一组参选者中选出一个领导者

2.createLeaderElection 创建DefaultLeaderElection,DefaultLeaderElection才会进行具体的选举操作

LeaderElection
public interface LeaderElectionService {

    /**
     * Creates a new {@link LeaderElection} instance that is registered to this {@code
     * LeaderElectionService} instance.
     *
     * @param componentId a unique identifier that refers to the stored leader information that the
     *     newly created {@link LeaderElection} manages.
     */
    LeaderElection createLeaderElection(String componentId);
}

1.LeaderElection 是LeaderElectionService和LeaderContender之间的代理

2.startLeaderElection启动选举者 将LeaderContender作为参数

3.confirmLeadership选举成功后会调用该方法

4.hasLeadership判断是否拥有指定session下的leadership

5.close停止高可用选举等服务

LeaderContender

参选者,组件必须实现了该接口才能进行Leader选举(WebMonitorEndpoint、ResourceManagerServiceImpl)


http://www.kler.cn/a/373952.html

相关文章:

  • 标题:自动化运维:现代IT运维的革新力量
  • 前端性能优化之卡顿监控与定位
  • Kafka 与传统 MQ 消息系统之间有三个关键区别?
  • 智能财务 | 数据与融合,激发企业财务数智化转型思考
  • centos7.X zabbix监控参数以及邮件报警和钉钉报警
  • 使用Fiddler Classic抓包工具批量下载音频资料
  • 智慧养老/社区养老/家政预约/老年护理 小程序
  • Certimate - 免费开源的 SSL 证书托管、自动续签工具,开发者维护 90 天免费证书的救星
  • 构建校园社团信息管理平台:Spring Boot技术的核心要点
  • Go 语言解析 yaml 文件的方法
  • 【idea】idea更新遇到的坑
  • 多用户商城系统的架构与实现
  • react-路由
  • YOLOv6-4.0部分代码阅读笔记-config.py
  • 【C++】深究类型转换
  • LVGL添加事件和删除事件
  • 一年期免费HTTPS证书:网络安全新选择
  • Docker环境安装MySQL
  • ubuntu交叉编译libffi库给arm平台使用
  • Jenkins Pipeline 部署总结
  • 爬虫笔记22——当当网图书详情页静、动态数据爬取
  • leetcode day7 442
  • 6. 线程池实现
  • 如何安装和使用PowerDesigner
  • TDengine 数据订阅 vs. InfluxDB 数据订阅:谁更胜一筹?
  • ETLCloud遇上MongoDB:灵活数据流,轻松管理