【ETCD】【源码阅读】深入分析 applierV3backend.Apply`方法源码
applierV3backend的Apply主要负责将 Raft 请求 (pb.InternalRaftRequest
) 应用到 Etcd 的后端存储中。它处理各种不同类型的请求,并且根据请求的具体内容调用相应的处理逻辑。
版本【release
文章目录
- 一、完整源码
- 二、方法详解
- 1. 定义和初始化
- 2. 记录操作开始时间并设置延迟处理
- 3. 处理不同的 Raft 请求
- 3.1 处理 `ClusterVersionSet` 请求
- 3.2 处理 `ClusterMemberAttrSet` 请求
- 3.3 处理 `DowngradeInfoSet` 请求
- 3.4 处理常规操作
- 4. 处理具体的操作请求(Range, Put, DeleteRange, Txn, etc.)
- 4.5 处理与 Lease 相关的请求
- 4.6 处理与身份验证相关的请求
- **4.7 处理用户和角色相关的请求**
- 5. 未实现的操作
- 6. 返回结果
- 三、 总结
一、完整源码
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
op := "unknown"
ar := &applyResult{}
defer func(start time.Time) {
success := ar.err == nil || ar.err == mvcc.ErrCompacted
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if !success {
warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
}(time.Now())
switch {
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return nil
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return nil
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return nil
}
if !shouldApplyV3 {
return nil
}
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
op = "Range"
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
op = "Put"
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
op = "LeaseGrant"
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
case r.Alarm != nil:
op = "Alarm"
ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
case r.Authenticate != nil:
op = "Authenticate"
ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
case r.AuthEnable != nil:
op = "AuthEnable"
ar.resp, ar.err = a.s.applyV3.AuthEnable()
case r.AuthDisable != nil:
op = "AuthDisable"
ar.resp, ar.err = a.s.applyV3.AuthDisable()
case r.AuthStatus != nil:
ar.resp, ar.err = a.s.applyV3.AuthStatus()
case r.AuthUserAdd != nil:
op = "AuthUserAdd"
ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
case r.AuthUserDelete != nil:
op = "AuthUserDelete"
ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
case r.AuthUserChangePassword != nil:
op = "AuthUserChangePassword"
ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
case r.AuthUserGrantRole != nil:
op = "AuthUserGrantRole"
ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
case r.AuthUserGet != nil:
op = "AuthUserGet"
ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
case r.AuthUserRevokeRole != nil:
op = "AuthUserRevokeRole"
ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
case r.AuthRoleAdd != nil:
op = "AuthRoleAdd"
ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
case r.AuthRoleGrantPermission != nil:
op = "AuthRoleGrantPermission"
ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
case r.AuthRoleGet != nil:
op = "AuthRoleGet"
ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
case r.AuthRoleRevokePermission != nil:
op = "AuthRoleRevokePermission"
ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
case r.AuthRoleDelete != nil:
op = "AuthRoleDelete"
ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
case r.AuthUserList != nil:
op = "AuthUserList"
ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
case r.AuthRoleList != nil:
op = "AuthRoleList"
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
default:
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
}
return ar
}
二、方法详解
1. 定义和初始化
op := "unknown"
ar := &applyResult{}
op
用于标识当前操作的类型,初始值为"unknown"
,在后续处理中会根据具体操作类型更新。ar
是一个applyResult
结构体的实例,用于存储操作结果,包括响应 (resp
)、错误 (err
) 等信息。
2. 记录操作开始时间并设置延迟处理
defer func(start time.Time) {
success := ar.err == nil || ar.err == mvcc.ErrCompacted
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if !success {
warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
}(time.Now())
- 延迟执行的回调函数:它在
Apply
方法结束时执行,记录应用操作的成功与否,并且基于操作的时长监控是否出现了性能问题或失败。 applySec.WithLabelValues
用来记录操作的耗时并标记成功与否。- 如果操作耗时过长,会触发警告 (
warnOfExpensiveRequest
)。 - 如果操作失败,会记录失败的警告 (
warnOfFailedRequest
)。
3. 处理不同的 Raft 请求
接下来,代码根据 pb.InternalRaftRequest
中不同字段的值来判断请求类型并进行相应处理。以下是一些具体的分支和操作:
3.1 处理 ClusterVersionSet
请求
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return nil
- 如果
ClusterVersionSet
字段不为空,表示请求是关于设置集群版本的。 - 调用
ClusterVersionSet
方法处理该请求。
3.2 处理 ClusterMemberAttrSet
请求
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet"
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return nil
- 如果
ClusterMemberAttrSet
字段不为空,表示请求是关于设置集群成员属性的。 - 调用
ClusterMemberAttrSet
方法处理该请求。
3.3 处理 DowngradeInfoSet
请求
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet"
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return nil
- 如果
DowngradeInfoSet
字段不为空,表示请求是关于设置降级信息的。 - 调用
DowngradeInfoSet
方法处理该请求。
3.4 处理常规操作
if !shouldApplyV3 {
return nil
}
- 如果
shouldApplyV3
为false
,则直接返回nil
,跳过后续的请求处理。
4. 处理具体的操作请求(Range, Put, DeleteRange, Txn, etc.)
接下来的部分是针对 Raft 请求中的不同字段进行处理。这些字段通常是 Etcd 后端存储操作的具体请求,如 Range
, Put
, DeleteRange
, Txn
, Compaction
等。
switch {
case r.Range != nil:
op = "Range"
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
op = "Put"
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
- 每个
case
分支对应不同的请求类型:Range
:执行范围查询操作。Put
:执行数据写入操作。DeleteRange
:执行范围删除操作。Txn
:执行事务操作。Compaction
:执行存储压缩操作。
对于每种操作,都会调用相应的处理方法,并把结果存储到 ar.resp
和 ar.err
中。
4.5 处理与 Lease 相关的请求
case r.LeaseGrant != nil:
op = "LeaseGrant"
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
- Lease 操作:这些请求处理与 Etcd 租约(lease)相关的操作,包括租约的授予、撤销和检查点等。
4.6 处理与身份验证相关的请求
case r.Authenticate != nil:
op = "Authenticate"
ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
- 身份验证操作:处理身份验证请求,通常用于授权等功能。
4.7 处理用户和角色相关的请求
case r.AuthUserAdd != nil:
op = "AuthUserAdd"
ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
- 这些请求处理与 Etcd 中的用户和角色相关的操作,包括添加用户、删除用户、修改密码、赋予角色等。
5. 未实现的操作
default:
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
- 如果 Raft 请求的类型未在上述处理分支中出现,程序会记录
Panic
日志,标明该操作尚未实现。
6. 返回结果
return ar
- 最后,返回
applyResult
结构体实例ar
,其中包含操作的结果。
三、 总结
- 操作类型判断:根据 Raft 请求的不同类型,选择合适的处理方法。
- 性能监控:对每个操作的处理时间进行监控,记录成功与否,并在超时或失败时进行警告。
- 支持多种操作:包括数据存取、事务、压缩、租约管理、身份验证和权限管理等。
- 错误处理:对不同类型的错误进行处理并返回响应结果。