深度了解flink(九) JobManager(3) HA分析
HA核心类、接口
HighAvailabilityServices
HighAvailabilityServices是HA Service的核心接口,具体功能如下:
1.定义了高可用组件(Dispatcher、ResourceManager等)的leader选举接口和leader获取接口
2.检查点元数据的持久:将检查点的元数据存储到持久化存储中,以便在系统重启或故障恢复时能够恢复状态。
3.注册最新的完成检查点:记录最新的完成检查点,以便在需要时能够快速恢复到该检查点的状态。
4.BLOB 存储的持久化:将大对象(BLOB)数据存储到持久化存储中,确保数据不会因系统重启而丢失。
5.标记作业状态的注册表:维护一个注册表,记录每个作业的状态(如运行中、已完成、失败等),以便进行状态管理和监控。
6.RPC 端点的命名:为远程过程调用(RPC)端点分配唯一的名称,以便在分布式系统中进行通信和调用。
HighAvailabilityServices的UML类图如下:
有两个实现类:
AbstractHaServices
不具有高可用服务的haService
AbstractNonHaServices
提供具有高可用的haService
HA Service初始化流程
ClusterEntry#initializeServices
ClusterEntry#initializeServices
进行了集群启动前服务初始化工作,其中也初始化了高可用服务
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
HighAvailabilityServicesUtils.createHighAvailabilityServices
调用createHaServices
->会调用HighAvailabilityServicesUtils.createHighAvailabilityServices
工具类的方法进行高可用服务的创建
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution,
RpcSystemUtils rpcSystemUtils,
FatalErrorHandler fatalErrorHandler)
throws Exception {
//获取ha模式,默认是NONE,可选项有ZOOKEEPER和KUBERNETES或者自定义,如果为NONE不开启高可用
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
//根据高可用mode返回对应的haService的实现类
switch (highAvailabilityMode) {
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
//省略
return new StandaloneHaServices(
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
case KUBERNETES:
return createCustomHAServices(
"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
configuration,
executor);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
步骤:
1.通过配置获取高可用的模式
2.根据模式创建对应的haService的实现类
Leader获取
LeaderRetrievalListener
public interface LeaderRetrievalListener {
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
void handleError(Exception exception);
}
LeaderRetrievalListener接口主要服务于当Leader地址修改了,会触发接口中的notifyLeaderAddress(LeaderRetrievalService内部持有该接口的对象实例),方法功能,
1.notifyLeaderAddress Leader地址选举后(第一次,或者Leander变更)会调用该方法
2.handleError 出现异常时会调用该方法
LeaderRetrievalService
public interface LeaderRetrievalService {
/**
* Starts the leader retrieval service with the given listener to listen for new leaders. This
* method can only be called once.
*
* @param listener The leader retrieval listener which will be notified about new leaders.
* @throws Exception
*/
void start(LeaderRetrievalListener listener) throws Exception;
/**
* Stops the leader retrieval service.
*
* @throws Exception
*/
void stop() throws Exception;
}
LeaderRetrievalService是Flink中一个重要的接口,它主要用于在leader变更时收到通知,并回调注册的LeaderRetrievalListener,方法,
1.start:开始监听Leader,该方法之启动一次
2.stop:停止监听
Leader 选举
LeaderElectionService
public interface LeaderElectionService {
/**
* Creates a new {@link LeaderElection} instance that is registered to this {@code
* LeaderElectionService} instance.
*
* @param componentId a unique identifier that refers to the stored leader information that the
* newly created {@link LeaderElection} manages.
*/
LeaderElection createLeaderElection(String componentId);
}
1.选举服务接口,它允许在一组参选者中选出一个领导者
2.createLeaderElection 创建DefaultLeaderElection,DefaultLeaderElection才会进行具体的选举操作
LeaderElection
public interface LeaderElectionService {
/**
* Creates a new {@link LeaderElection} instance that is registered to this {@code
* LeaderElectionService} instance.
*
* @param componentId a unique identifier that refers to the stored leader information that the
* newly created {@link LeaderElection} manages.
*/
LeaderElection createLeaderElection(String componentId);
}
1.LeaderElection 是LeaderElectionService和LeaderContender之间的代理
2.startLeaderElection启动选举者 将LeaderContender作为参数
3.confirmLeadership选举成功后会调用该方法
4.hasLeadership判断是否拥有指定session下的leadership
5.close停止高可用选举等服务
LeaderContender
参选者,组件必须实现了该接口才能进行Leader选举(WebMonitorEndpoint、ResourceManagerServiceImpl)