etcd入门指南:分布式事务、分布式锁及核心API详解
etcd 是一个高可用、分布式的键值存储系统。主要用作分布式系统中的独立协调服务。旨在保存可完全放入内存中的少量数据。
Raft
etcd 基于 Raft 共识算法,保证了分布式环境下的数据一致性。
Raft是一种分布式一致性算法,用于在多个节点之间达成共识,确保分布式系统中的数据在不同节点间一致。
- Leader Election(领导者选举)
在Raft 中,系统的节点分为三种状态:领导者(Leader)、跟随者(Follower)和候选者(Candidate)。其中,Leader 负责处理客户端请求并将日志条目复制到 Follower 节点。在集群启动时或当前 Leader 失效时,节点之间会进行一次选举,选举出一个新的 Leader。
etcd中的Learner(学习者)是只读成员,它不会参与投票或决策,但可以同步数据,适用于增量扩展或数据迁移过程中。当learner日志赶上leader日志时,可以通过member promote API提升为正式成员,Learner节点拒绝客户端读取和写入。
- Log Replication(日志复制)
Leader Election完成后,所有客户端请求都会通过 Leader 处理,Leader 会将客户端请求以日志条目的形式记录下来,并将其复制到其他 Follower 节点。Follower 节点会接收并应用这些日志条目,确保每个节点的数据状态一致。
- Safety
通过任期、投票、日志条目的提交机制,Raft 保证了系统的一致性,即使部分节点出现故障,也能通过 Leader 和日志复制机制维持系统的整体一致性。
etcd API
etcd proto结构定义
etcd client v3使用gRPC进行远程过程调用,定义了交互过程中protobuf数据结构
ResponseHeader(响应头)
message ResponseHeader {
uint64 cluster_id = 1; //集群ID
uint64 member_id = 2; //成员ID
int64 revision = 3; // etcd 键值存储的版本
uint64 raft_term = 4; //etcd集群Raft协议的任期号
}
KeyValue(键值对)
message KeyValue {
bytes key = 1; //可以不能为空
int64 create_revision = 2; //最后一次创建时的版本
int64 mod_revision = 3; //最后一次修改时的版本
int64 version = 4; //可以的版本,删除key,版本会重置为0,任何修改都会增加版本
bytes value = 5; //
int64 lease = 6; //租约,如果没有租约lease=0
}
Range(返回查询)
message RangeRequest {
enum SortOrder {
NONE = 0; // default, no sorting
ASCEND = 1; // lowest target value first
DESCEND = 2; // highest target value first
}
enum SortTarget {
KEY = 0;
VERSION = 1;
CREATE = 2;
MOD = 3;
VALUE = 4;
}
bytes key = 1; //请求起始key
bytes range_end = 2; //请求结束key 和key组成范围
int64 limit = 3; //限制返回的key数量
int64 revision = 4; //期望的revision
SortOrder sort_order = 5; //指定返回结果排序
SortTarget sort_target = 6; //指定排序目标字段
bool serializable = 7; //设置为 true,则请求线性化的一致性读取,即在读取之前,所有之前的写入都已经被持久化。serizlized可有由任何节点读取,因为数据已经序列化。相反线性一致性读取(Linearized Read)只能从领导者读取
bool keys_only = 8; //设置为 true,则只返回键,不返回值
bool count_only = 9; //设置为 true,则只返回匹配范围内的键值对数量,不返回键或值
int64 min_mod_revision = 10; //指定返回的键值对的最小修改修订号。只有修改修订号大于或等于这个值的键值对会被返回。
int64 max_mod_revision = 11; //定返回的键值对的最大修改修订号。只有修改修订号小于或等于这个值的键值对会被返回。
int64 min_create_revision = 12; //指定返回的键值对的最小创建修订号。只有创建修订号大于或等于这个值的键值对会被返回
int64 max_create_revision = 13; //指定返回的键值对的最大创建修订号。只有创建修订号小于或等于这个值的键值对会被返回。
}
message RangeResponse {
ResponseHeader header = 1;
repeated mvccpb.KeyValue kvs = 2; //与范围请求匹配的键值对列表,如果设置了Count_only,kvs为空
bool more = 3; //如果设置了limit 用于判断是否有更多的键
int64 count = 4; //满足range request的key数量
}
Put(设置操作)
message PutRequest {
bytes key = 1;
bytes value = 2;
int64 lease = 3;
bool prev_kv = 4; //如果设置为true,则返回存储操作之前的键值对
bool ignore_value = 5; //如果设置为true,会更新key,忽略value。如果key不存在返回error
bool ignore_lease = 6; //如果设置为true,会更新key,忽略lease(不会更新租约)。如果key不存在返回error
}
message PutResponse {
ResponseHeader header = 1;
mvccpb.KeyValue prev_kv = 2; //prev_kv设置为true时,返回更新前的key-value
}
Delete Range(删除操作)
message DeleteRangeRequest {
bytes key = 1;
bytes range_end = 2;
bool prev_kv = 3; //设置true时,返回删除的key-value
}
message DeleteRangeResponse {
ResponseHeader header = 1;
int64 deleted = 2; //删除的key数量
repeated mvccpb.KeyValue prev_kvs = 3; //删除的键值对列表
}
v3 API
Cluster
- MemberList 列出当前集群成员
- MemberAdd 将新成员添加到集群中
- MemberAddAsLearner 添加一个新的learner到集群中
- MemberRemove 移除成员
- MemberUpdate 更新成员配置信息
- MemberPromote 将一个 “学习者” 节点提升为正式的 etcd 集群成员
KV
- Put 设置键值对
- Get 默认从 etcd 存储中获取一个键的值,如果有传key范围,可以取key范围的值
- Delete 删除一个key或一个范围的key
- Compact 压缩 etcd 存储的历史版本,以减少存储空间
- Do 通用操作接口,便于扩展和组合操作
- Txn 创建一个事务
Lease
- Grant 创建一个新的租约 ,返回LeaseGrantResponse
- Revoke 撤销租约
- TimeToLive 给定一个LeaseID,查询某个Lease的剩余有效时间及相关信息
- Leases 列出所有租约信息
- KeepAlive 通过定期心跳,保持租约的有效性
- KeepAliveOnce 仅续租一次
- Close 关闭租约相关的连接和资源
Watcher
- Watch 监听某个键或键范围的变化
- RequestProgress 用于请求 Watcher 发送当前进度通知。这个方法允许客户端检查 Watcher 进度是否已经追踪到最新的变化。
- Close 关闭 Watcher,并释放其资源。
Auth
- Authenticate 对用户进行认证,检查提供的用户名和密码是否有效
- AuthEnable 启用认证机制
- AuthDisable 禁用认证
- AuthStatus 获取集群认证机制状态
- UserAdd 添加一个新用户
- UserAddWithOptions 添加一个新用户,并提供一些额外的选项,如用户角色等。
- UserDelete 删除用户
- UserChangePassword 修改密码
- UserGrantRole 给 etcd 集群中的用户授予一个角色
- UserGet 获取用户信息
- UserList 获取用户列表
- UserRevokeRole 撤销 etcd 集群中用户的角色
- RoleAdd 添加角色
- RoleGrantPermission 给角色授权
- RoleGet 获取角色
- RoleList 获取角色列表
- RoleRevokePermission 撤销角色权限
- RoleDelete 删除角色
Maintenance
- AlarmList 列出集群所有的告警
- AlarmDisarm 解除某个节点的告警
- Defragment 对 etcd 节点的存储文件进行碎片整理
- Status 返回特定 etcd 节点的状态信息
- HashKV 计算 etcd 集群中键值存储的历史哈希值,这个命令可以用于检查集群成员之间数据的一致性。
- Snapshot 创建 etcd 集群的快照
- MoveLeader 将 etcd 集群的 leader 角色转移到指定的成员
事务(Transaction)
proto 结构
message RequestOp {
// request is a union of request types accepted by a transaction.
oneof request {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
}
}
message TxnRequest {
repeated Compare compare = 1;
repeated RequestOp success = 2;
repeated RequestOp failure = 3;
}
message TxnResponse {
ResponseHeader header = 1;
bool succeeded = 2; // 如果compare结果是true 返回true,否则返回false
repeated ResponseOp responses = 3; // 包含事物中每个操作的响应结果。如果Succeeded=true,那么包含Then所有操作的响应。否则包含Else所有操作的响应
}
message ResponseOp {
oneof response {
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
}
}
etcd client v3 事务接口
type Txn interface {
//如果所有比较成功,执行Then方法,否则执行Else方法。用于检查 etcd 键值存储中的特定条件是否满足。
If(cs ...Cmp) Txn
// 比较成功时执行
Then(ops ...Op) Txn
// 失败是执行
Else(ops ...Op) Txn
//提交事物
Commit() (*TxnResponse, error)
}
代码示例
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 3 * time.Second,
})
error.FailOnError(err, "Failed to connect to etcd")
defer cli.Close()
// 构建事务
txn := cli.Txn(context.Background()).
If(clientv3.Compare(clientv3.Value("/txn/accountA"), "=", "100")).
Then(clientv3.OpPut("/txn/accountA", "90"), clientv3.OpPut("/txn/accountB", "110")).
Else(clientv3.OpGet("/txn/accountA"))
// 提交事务
txnResp, err := txn.Commit()
error.FailOnError(err, "Txn Commit failed")
// 打印事务结果
if txnResp.Succeeded {
fmt.Printf("Txn succeeded : %v", txnResp.Responses)
} else {
getResp := txnResp.Responses[0].GetResponseRange()
if len(getResp.Kvs) > 0 {
fmt.Printf("Txn failed : %v", string(getResp.Kvs[0].Value))
}
}
}
Mutex 分布式锁
etcd client v3版本内置了分布式锁的实现。
内置方法
- TryLock 尝试获取锁,如果获取失败,立即返回
- Lock 阻塞调用,直到获得锁或取消
- Unlock 释放锁
- IsOwner 检查当前会话是否是锁的持有者
- Key 返回锁的key
- Header 返回最近一次成功获取锁的响应头信息
使用示例
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 3 * time.Second,
})
errorHandle.FailOnError(err, "Failed to connect to etcd")
defer cli.Close()
//创建会话
s, err := concurrency.NewSession(cli)
failOnError(err, "Failed to create session")
//执行结束,撤销lease,释放锁
defer s.Close()
m := concurrency.NewMutex(s, "/mutex/lock")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := m.Lock(ctx); err != nil {
log.Fatal(err)
}
fmt.Printf("锁获取成功,当前锁的key,%v \n", m.Key())
if err := m.Unlock(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("释放锁")
}
实现原理
etcd的分布式锁主要利用了Revision机制、租约(Lease)机制及事务(txn)、Watch机制等特性。
etcd mutex基于给定的锁前缀加上每个会话的leaseId创建kv,最先创建的kv所在的会话为锁的获得者。其他没有获得锁的会话通过Watch机制监听锁的key的删除事件,来尝试获取锁。
部分源码
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//如果key revision 为 0,当前会话还没创建key
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
//取锁前缀下 第一个key的创建信息(根据Revision排序)
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
//返回锁信息
return resp, nil
}
基准测试
官方提供Benchmarking etcd v3
reading one single key
key size in bytes | number of clients | read QPS | 90th Percentile Latency (ms) |
---|---|---|---|
256 | 1 | 2716 | 0.4 |
256 | 64 | 16623 | 6.1 |
256 | 256 | 16622 | 21.7 |
The performance is nearly the same as the one with empty server handler.
reading one single key after putting
key size in bytes | number of clients | read QPS | 90th Percentile Latency (ms) |
---|---|---|---|
256 | 1 | 2269 | 0.5 |
256 | 64 | 13582 | 8.6 |
256 | 256 | 13262 | 47.5 |
The performance with empty server handler is not affected by one put. So the performance downgrade should be caused by storage package.