当前位置: 首页 > article >正文

etcd入门指南:分布式事务、分布式锁及核心API详解

etcd 是一个高可用、分布式的键值存储系统。主要用作分布式系统中的独立协调服务。旨在保存可完全放入内存中的少量数据。

Raft

etcd 基于 Raft 共识算法,保证了分布式环境下的数据一致性。
Raft是一种分布式一致性算法,用于在多个节点之间达成共识,确保分布式系统中的数据在不同节点间一致。

  • Leader Election(领导者选举)

在Raft 中,系统的节点分为三种状态:领导者(Leader)、跟随者(Follower)和候选者(Candidate)。其中,Leader 负责处理客户端请求并将日志条目复制到 Follower 节点。在集群启动时或当前 Leader 失效时,节点之间会进行一次选举,选举出一个新的 Leader。

etcd中的Learner(学习者)是只读成员,它不会参与投票或决策,但可以同步数据,适用于增量扩展或数据迁移过程中。当learner日志赶上leader日志时,可以通过member promote API提升为正式成员,Learner节点拒绝客户端读取和写入。

  • Log Replication(日志复制)

Leader Election完成后,所有客户端请求都会通过 Leader 处理,Leader 会将客户端请求以日志条目的形式记录下来,并将其复制到其他 Follower 节点。Follower 节点会接收并应用这些日志条目,确保每个节点的数据状态一致。

  • Safety

通过任期、投票、日志条目的提交机制,Raft 保证了系统的一致性,即使部分节点出现故障,也能通过 Leader 和日志复制机制维持系统的整体一致性。

etcd API

etcd proto结构定义

etcd client v3使用gRPC进行远程过程调用,定义了交互过程中protobuf数据结构

ResponseHeader(响应头)

message ResponseHeader {
  uint64 cluster_id = 1; //集群ID
  uint64 member_id = 2; //成员ID
  int64 revision = 3; // etcd 键值存储的版本
  uint64 raft_term = 4; //etcd集群Raft协议的任期号
}

KeyValue(键值对)

message KeyValue {
  bytes key = 1; //可以不能为空
  int64 create_revision = 2; //最后一次创建时的版本
  int64 mod_revision = 3; //最后一次修改时的版本
  int64 version = 4; //可以的版本,删除key,版本会重置为0,任何修改都会增加版本
  bytes value = 5; //
  int64 lease = 6; //租约,如果没有租约lease=0
}

Range(返回查询)

message RangeRequest {
  enum SortOrder {
	NONE = 0; // default, no sorting
	ASCEND = 1; // lowest target value first
	DESCEND = 2; // highest target value first
  }
  enum SortTarget {
	KEY = 0;
	VERSION = 1;
	CREATE = 2;
	MOD = 3;
	VALUE = 4;
  }

  bytes key = 1; //请求起始key
  bytes range_end = 2; //请求结束key 和key组成范围
  int64 limit = 3;  //限制返回的key数量
  int64 revision = 4; //期望的revision
  SortOrder sort_order = 5; //指定返回结果排序
  SortTarget sort_target = 6; //指定排序目标字段
  bool serializable = 7; //设置为 true,则请求线性化的一致性读取,即在读取之前,所有之前的写入都已经被持久化。serizlized可有由任何节点读取,因为数据已经序列化。相反线性一致性读取(Linearized Read)只能从领导者读取
  bool keys_only = 8; //设置为 true,则只返回键,不返回值
  bool count_only = 9; //设置为 true,则只返回匹配范围内的键值对数量,不返回键或值
  int64 min_mod_revision = 10; //指定返回的键值对的最小修改修订号。只有修改修订号大于或等于这个值的键值对会被返回。
  int64 max_mod_revision = 11;  //定返回的键值对的最大修改修订号。只有修改修订号小于或等于这个值的键值对会被返回。
  int64 min_create_revision = 12; //指定返回的键值对的最小创建修订号。只有创建修订号大于或等于这个值的键值对会被返回
  int64 max_create_revision = 13; //指定返回的键值对的最大创建修订号。只有创建修订号小于或等于这个值的键值对会被返回。
}

message RangeResponse {
  ResponseHeader header = 1;
  repeated mvccpb.KeyValue kvs = 2; //与范围请求匹配的键值对列表,如果设置了Count_only,kvs为空
  bool more = 3; //如果设置了limit 用于判断是否有更多的键
  int64 count = 4; //满足range request的key数量
}

Put(设置操作)

message PutRequest {
  bytes key = 1; 
  bytes value = 2;
  int64 lease = 3;
  bool prev_kv = 4; //如果设置为true,则返回存储操作之前的键值对
  bool ignore_value = 5; //如果设置为true,会更新key,忽略value。如果key不存在返回error
  bool ignore_lease = 6; //如果设置为true,会更新key,忽略lease(不会更新租约)。如果key不存在返回error
}

message PutResponse {
  ResponseHeader header = 1;
  mvccpb.KeyValue prev_kv = 2; //prev_kv设置为true时,返回更新前的key-value
}

Delete Range(删除操作)

message DeleteRangeRequest {
  bytes key = 1;
  bytes range_end = 2;
  bool prev_kv = 3; //设置true时,返回删除的key-value
}

message DeleteRangeResponse {
  ResponseHeader header = 1;
  int64 deleted = 2; //删除的key数量
  repeated mvccpb.KeyValue prev_kvs = 3; //删除的键值对列表
}

v3 API

Cluster

  • MemberList 列出当前集群成员
  • MemberAdd 将新成员添加到集群中
  • MemberAddAsLearner 添加一个新的learner到集群中
  • MemberRemove 移除成员
  • MemberUpdate 更新成员配置信息
  • MemberPromote 将一个 “学习者” 节点提升为正式的 etcd 集群成员

KV

  • Put 设置键值对
  • Get 默认从 etcd 存储中获取一个键的值,如果有传key范围,可以取key范围的值
  • Delete 删除一个key或一个范围的key
  • Compact 压缩 etcd 存储的历史版本,以减少存储空间
  • Do 通用操作接口,便于扩展和组合操作
  • Txn 创建一个事务

Lease

  • Grant 创建一个新的租约 ,返回LeaseGrantResponse
  • Revoke 撤销租约
  • TimeToLive 给定一个LeaseID,查询某个Lease的剩余有效时间及相关信息
  • Leases 列出所有租约信息
  • KeepAlive 通过定期心跳,保持租约的有效性
  • KeepAliveOnce 仅续租一次
  • Close 关闭租约相关的连接和资源

Watcher

  • Watch 监听某个键或键范围的变化
  • RequestProgress 用于请求 Watcher 发送当前进度通知。这个方法允许客户端检查 Watcher 进度是否已经追踪到最新的变化。
  • Close 关闭 Watcher,并释放其资源。

Auth

  • Authenticate 对用户进行认证,检查提供的用户名和密码是否有效
  • AuthEnable 启用认证机制
  • AuthDisable 禁用认证
  • AuthStatus 获取集群认证机制状态
  • UserAdd 添加一个新用户
  • UserAddWithOptions 添加一个新用户,并提供一些额外的选项,如用户角色等。
  • UserDelete 删除用户
  • UserChangePassword 修改密码
  • UserGrantRole 给 etcd 集群中的用户授予一个角色
  • UserGet 获取用户信息
  • UserList 获取用户列表
  • UserRevokeRole 撤销 etcd 集群中用户的角色
  • RoleAdd 添加角色
  • RoleGrantPermission 给角色授权
  • RoleGet 获取角色
  • RoleList 获取角色列表
  • RoleRevokePermission 撤销角色权限
  • RoleDelete 删除角色

Maintenance

  • AlarmList 列出集群所有的告警
  • AlarmDisarm 解除某个节点的告警
  • Defragment 对 etcd 节点的存储文件进行碎片整理
  • Status 返回特定 etcd 节点的状态信息
  • HashKV 计算 etcd 集群中键值存储的历史哈希值,这个命令可以用于检查集群成员之间数据的一致性。
  • Snapshot 创建 etcd 集群的快照
  • MoveLeader 将 etcd 集群的 leader 角色转移到指定的成员

事务(Transaction)

proto 结构

message RequestOp {
  // request is a union of request types accepted by a transaction.
  oneof request {
    RangeRequest request_range = 1;
    PutRequest request_put = 2;
    DeleteRangeRequest request_delete_range = 3;
  }
}

message TxnRequest {
  repeated Compare compare = 1;
  repeated RequestOp success = 2;
  repeated RequestOp failure = 3;
}

message TxnResponse {
  ResponseHeader header = 1;
  bool succeeded = 2;  // 如果compare结果是true 返回true,否则返回false 
  repeated ResponseOp responses = 3; //  包含事物中每个操作的响应结果。如果Succeeded=true,那么包含Then所有操作的响应。否则包含Else所有操作的响应
}

message ResponseOp {
  oneof response {
    RangeResponse response_range = 1;
    PutResponse response_put = 2;
    DeleteRangeResponse response_delete_range = 3;
  }
}


etcd client v3 事务接口

type Txn interface {
	//如果所有比较成功,执行Then方法,否则执行Else方法。用于检查 etcd 键值存储中的特定条件是否满足。
	If(cs ...Cmp) Txn
	// 比较成功时执行
	Then(ops ...Op) Txn
	// 失败是执行
	Else(ops ...Op) Txn
	//提交事物
	Commit() (*TxnResponse, error)
}

代码示例

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 3 * time.Second,
	})
	error.FailOnError(err, "Failed to connect to etcd")
	defer cli.Close()

	// 构建事务
	txn := cli.Txn(context.Background()).
		If(clientv3.Compare(clientv3.Value("/txn/accountA"), "=", "100")).
		Then(clientv3.OpPut("/txn/accountA", "90"), clientv3.OpPut("/txn/accountB", "110")).
		Else(clientv3.OpGet("/txn/accountA"))

	// 提交事务
	txnResp, err := txn.Commit()
	error.FailOnError(err, "Txn Commit failed")

	// 打印事务结果
	if txnResp.Succeeded {
		fmt.Printf("Txn succeeded : %v", txnResp.Responses)
	} else {
		getResp := txnResp.Responses[0].GetResponseRange()
		if len(getResp.Kvs) > 0 {
			fmt.Printf("Txn failed : %v", string(getResp.Kvs[0].Value))
		}
	}

}

Mutex 分布式锁

etcd client v3版本内置了分布式锁的实现。

内置方法

  • TryLock 尝试获取锁,如果获取失败,立即返回
  • Lock 阻塞调用,直到获得锁或取消
  • Unlock 释放锁
  • IsOwner 检查当前会话是否是锁的持有者
  • Key 返回锁的key
  • Header 返回最近一次成功获取锁的响应头信息

使用示例

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 3 * time.Second,
	})
	errorHandle.FailOnError(err, "Failed to connect to etcd")
	defer cli.Close()

	//创建会话
	s, err := concurrency.NewSession(cli)
	failOnError(err, "Failed to create session")
        //执行结束,撤销lease,释放锁
	defer s.Close()

	m := concurrency.NewMutex(s, "/mutex/lock")

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	if err := m.Lock(ctx); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("锁获取成功,当前锁的key,%v \n", m.Key())

	if err := m.Unlock(ctx); err != nil {
		log.Fatal(err)
	}
	fmt.Println("释放锁")
	
}

实现原理

etcd的分布式锁主要利用了Revision机制、租约(Lease)机制及事务(txn)、Watch机制等特性。

etcd mutex基于给定的锁前缀加上每个会话的leaseId创建kv,最先创建的kv所在的会话为锁的获得者。其他没有获得锁的会话通过Watch机制监听锁的key的删除事件,来尝试获取锁。

部分源码

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()

	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	//如果key revision 为 0,当前会话还没创建key
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put self in lock waiters via myKey; oldest waiter holds lock
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// reuse key in case this session already holds the lock
	get := v3.OpGet(m.myKey)
	// fetch current holder to complete uncontended path with only one RPC
	//取锁前缀下 第一个key的创建信息(根据Revision排序)
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	//返回锁信息
	return resp, nil
}

基准测试

官方提供Benchmarking etcd v3

reading one single key

key size in bytesnumber of clientsread QPS90th Percentile Latency (ms)
256127160.4
25664166236.1
2562561662221.7

The performance is nearly the same as the one with empty server handler.

reading one single key after putting

key size in bytesnumber of clientsread QPS90th Percentile Latency (ms)
256122690.5
25664135828.6
2562561326247.5

The performance with empty server handler is not affected by one put. So the performance downgrade should be caused by storage package.


http://www.kler.cn/a/308870.html

相关文章:

  • datastage在升级版本到11.7之后,部分在11.3上正常执行的SP报错SQLSTATE = 22007: 本机错误代码 = -180
  • 微澜:用 OceanBase 搭建基于知识图谱的实时资讯流的应用实践
  • 〔 MySQL 〕数据类型
  • 【Java语言】String类
  • 软件测试面试大全(含答案+文档)
  • WLAN消失或者已连接但是访问不了互联网
  • 企业开发时,会使用sqlalchedmy来构建数据库 结构吗? 还是说直接写SQL 语句比较多?
  • 断电重启之后服务器都有哪些服务需要重启
  • 828华为云征文|docker部署kafka及ui搭建
  • VRRP 笔记
  • 认知小文3《打破桎梏,编程与人生的基本法则》
  • 抓机遇,创发展︱2025 第十二届广州国际汽车零部件加工技术及汽车模具展览会,零部件国产浪潮不可阻挡
  • Pillow:Python图像处理库详解
  • 计算机网络(网络层)
  • 系统架构设计师:系统质量属性与架构评估
  • 固态硬盘:量产、开卡、ROM短接是指什么?
  • 34.贪心算法1
  • 2024最新股票系统源码 附教程
  • Track 08:AIML
  • CTFHub技能树-信息泄露-HG泄漏
  • 医学数据分析实训 项目二 数据预处理作业
  • 在 React 中掌握 useImperativeHandle(使用 TypeScript)
  • visual prompt tuning和visual instruction tuning
  • 白话:大型语言模型中的幻觉(Hallucinations)
  • react hooks--useState
  • Spring Boot基础