深度了解flink(十一) 心跳机制详解
前言
在Flink的各个服务组件中,ResourceManager、JobMaster、TaskExecutor三者之间存在相互检测的心跳机制:ResourceManager会主动发送心跳请求探测JobMaster、TaskExecutor是否存活;JobMaster也会主动发送心跳请求探测TaskExecutor是否存活,以便进行任务重启或者失败处理。
核心组件
HeartbeatMonitor
接口,定义心跳状态信息相关的接口,管理需要心跳通信相关组件的心跳状态
HeartbeatTarget
作用
1.发起心跳的请求
2.接收心跳并进行响应
3.心跳信息中的负载可以添加额外的信息
4.维护ResourceId和HeartBeatMonitor的Map集合映射
UML
HeartbeatManager
接口,能够停止/启动 心跳的监听,并且能够在心跳超时进行上报
HeartbeatManagerImpl
HeartbeatManager的实现类,提供了组件监听的方法,心跳目标对象注册后会将监控对象信息方法Map中
HeartbeatManagerSenderImpl
继承了HeartbeatManagerImpl,实现了Runnable接口,能够周期性的调度进行心跳的检测
初始化
HeartbeatManagerSenderImpl(
long heartbeatPeriod,
long heartbeatTimeout,
int failedRpcRequestsUntilUnreachable,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log,
HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
super(
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
ownResourceID,
heartbeatListener,
mainThreadExecutor,
log,
heartbeatMonitorFactory);
this.heartbeatPeriod = heartbeatPeriod;
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
在初始化的时候会被线程池提交一个立刻执行的任务,从而进入到run方法
@Override
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
作用
1.进行状态判断
2.遍历心跳请求目标
3.发起心跳请求
4.以固定心跳周期的定时任务进行调度
HeartbeatServices
ClusterEntrypoint是集群启动的入口,启动的过程中initializeServices进行了服务的初始化,也包含心跳服务HeartbeatServices的初始化。
初始化HeartbeatServices调用方法栈如下:
ClusterEntrypoint#runCluster->
ClusterEntrypoint#initializeServices->
ClusterEntrypoint#createHaServices->
HeartbeatServices#fromConfiguration
作用
负责创建HeartbeatManagerSenderImpl对象
@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
组件心跳服务启动
之前分析Flink Rpc底层原理,Flink的组件都继承自EndPoint,组件初始化后会执行start方法,给组件本身发送Start 控制类消息,从而进入到onStart方法,组件的心跳服务一般都在onStart方法进行启动
ResourceManager
onStart方法
组件启动进入到onStart方法,该方法中对ResourceManager的服务进行初始化
@Override
public final void onStart() throws Exception {
try {
log.info("Starting the resource manager.");
//初始化ResourceManager的服务
startResourceManagerServices();
startedFuture.complete(null);
} catch (Throwable t) {
final ResourceManagerException exception =
new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
startResourceManagerServices 启动心跳服务
private void startResourceManagerServices() throws Exception {
try {
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerMetrics();
startHeartbeatServices();
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
resourceAllocator,
new ResourceEventListenerImpl(),
blocklistHandler::isBlockedTaskManager);
delegationTokenManager.start(this);
initialize();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
ResourceManager负责计算资源的分配,JobMaster解析作业后会向ResouceManager申请资源,ReouceManger收到申请后会分配TaskManager Slot资源给用于运行任务,所以ReouceManager和这两个组件需要维持心跳,startHeartbeatServices方法内就进行了心跳管理对象的初始化
private void startHeartbeatServices() {
//taskmanager 心跳管理对象 底层使用map进行存储
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
//jobMaster 心跳管理对象 底层使用map进行存储
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
heartbeatServices创建HeartbeatManagerSender内部就启动一个定时任务调度,遍历这两个对象的Map集合,因为ResourceManager初始化后还没组件进行心跳注册,此时为空跑。
JobMaster
onStart方法
@Override
protected void onStart() throws JobMasterException {
try {
startJobExecution();
} catch (Exception e) {
final JobMasterException jobMasterException =
new JobMasterException("Could not start the JobMaster.", e);
handleJobMasterError(jobMasterException);
throw jobMasterException;
}
}
心跳服务初始化,方法调用栈如下
startJobExecution->
startJobMasterServices->
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
private void startJobMasterServices() throws Exception {
try {
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
//选举ResourceManager的leader
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
初始化了TaskManager和ResourceManager的心跳管理服务
resourceManagerLeaderRetriever.start 获取ResourceManager的Leader对象,获取成功后监听器进行回调
@Override
public void start(LeaderRetrievalListener listener) {
checkNotNull(listener, "Listener must not be null.");
synchronized (startStopLock) {
checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
started = true;
// directly notify the listener, because we already know the leading JobManager's
// address
listener.notifyLeaderAddress(leaderAddress, leaderId);
}
}
监听器回调方法对状态进行校验,然后会进入到rpc连接的方法
private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {
Preconditions.checkState(
currentJobMasterId == null && rpcConnection == null,
"Cannot open a new rpc connection if the previous connection has not been closed.");
currentJobMasterId = jobMasterId;
//封装rpc连接信息
rpcConnection =
new JobManagerRegisteredRpcConnection(
LOG, leaderAddress, jobMasterId, rpcService.getScheduledExecutor());
LOG.info(
"Try to register at job manager {} with leader id {}.",
leaderAddress,
jobMasterId.toUUID());
//开始和ResourceManager建立Rpc连接
rpcConnection.start();
}
RegisteredRpcConnection#start方法开始对rpc连接进行注册
public void start() {
//连接状态校验
checkState(!closed, "The RPC connection is already closed");
checkState(
!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
//创建新的注册对象,这一步生成注册信息
final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
createNewRegistration方法如下,
private RetryingRegistration<F, G, S, R> createNewRegistration() {
//初始化注册信息
RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());
CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
newRegistration.getFuture();
//回调方法
future.whenCompleteAsync(
//代码省略
);
return newRegistration;
}
}
generateRegistration会创建RetryingRegistration,并定义了注册的方法
protected RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>
generateRegistration() {
return new RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>(
log,
getRpcService(),
"ResourceManager",
ResourceManagerGateway.class,
getTargetAddress(),
getTargetLeaderId(),
jobMasterConfiguration.getRetryingRegistrationConfiguration()) {
//定义了异步方法
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway gateway,
ResourceManagerId fencingToken,
long timeoutMillis) {
Time timeout = Time.milliseconds(timeoutMillis);
return gateway.registerJobMaster(
jobMasterId,
jobManagerResourceID,
jobManagerRpcAddress,
jobID,
timeout);
}
};
}
ResourceManagerGateway是ResourceManager的动态代理对象,执行gateway.registerJobMaster这个方法会远程调用ResourceManager的registerJobMaster方法,ResourceManager#registerJobMaster如下,
public CompletableFuture<RegistrationResponse> registerJobMaster(
final JobMasterId jobMasterId,
final ResourceID jobManagerResourceId,
final String jobManagerAddress,
final JobID jobId,
final Time timeout) {
CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);
CompletableFuture<RegistrationResponse> registrationResponseFuture =
jobMasterGatewayFuture.thenCombineAsync(
jobMasterIdFuture,
(JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {
if (Objects.equals(leadingJobMasterId, jobMasterId)) {
return registerJobMasterInternal(
jobMasterGateway,
jobId,
jobManagerAddress,
jobManagerResourceId);
} else {
final String declineMessage =
String.format(
"The leading JobMaster id %s did not match the received JobMaster id %s. "
+ "This indicates that a JobMaster leader change has happened.",
leadingJobMasterId, jobMasterId);
log.debug(declineMessage);
return new RegistrationResponse.Failure(
new FlinkException(declineMessage));
}
},
getMainThreadExecutor());
}
会走到registerJobMasterInternal这个方法
private RegistrationResponse registerJobMasterInternal(
final JobMasterGateway jobMasterGateway,
JobID jobId,
String jobManagerAddress,
ResourceID jobManagerResourceId) {
//代码省略
// jobmastet 作为监听对象存入map中
jobManagerHeartbeatManager.monitorTarget(
jobManagerResourceId, new JobMasterHeartbeatSender(jobMasterGateway));
return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
}
jobManagerHeartbeatManager.monitorTarget 最终ResouceManager会把JobMaster作为监控对象存在Map中,后续调度遍历map发送心跳请求