【etcd】etcd_go操作与etcd锁实现原理
etcd 是一个分布式的键值存储系统,提供了丰富的 API 用于操作和管理键值对。etcd 的 API 基于 gRPC 和 HTTP/JSON,支持多种编程语言。以下是 etcd API 的详细讲解,包括核心功能和示例。
1. etcd API 版本
etcd 目前有两个主要的 API 版本:
- v2 API:早期的 API,基于 HTTP/JSON,功能较为简单。
- v3 API:当前的推荐版本,基于 gRPC,功能更强大,性能更好。
以下内容主要基于 v3 API。
2. etcd v3 API 的核心功能
etcd v3 API 提供了以下核心功能:
2.1 键值操作
- Put:设置键值对。
- Get:获取键值对。
- Delete:删除键值对。
- Txn:事务操作,支持原子性操作。
2.2 Watch 机制
- 监听键的变化,当键的值发生变化时触发回调。
2.3 租约(Lease)
- 为键值对设置租约(TTL),到期后自动删除。
2.4 分布式锁
- 基于 etcd 的原子操作实现分布式锁。
2.5 集群管理
- 管理 etcd 集群的成员和状态。
3. etcd v3 API 的使用
3.1 安装 etcd 客户端库
以 Go 语言为例,安装 etcd 客户端库:
go get go.etcd.io/etcd/client/v3
3.2 创建 etcd 客户端
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/client/v3"
)
func main() {
// 创建 etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"}, // etcd 地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
fmt.Println("Connected to etcd!")
}
3.3 键值操作
Put:设置键值对
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "/config/database/host", "localhost")
cancel()
if err != nil {
log.Fatal(err)
}
fmt.Println("Put key successfully!")
Get:获取键值对
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "/config/database/host")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("Key: %s, Value: %s\n", ev.Key, ev.Value)
}
Delete:删除键值对
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
_, err = cli.Delete(ctx, "/config/database/host")
cancel()
if err != nil {
log.Fatal(err)
}
fmt.Println("Deleted key successfully!")
3.4 Watch 机制
监听键的变化:
watchChan := cli.Watch(context.Background(), "/config/database/host")
for resp := range watchChan {
for _, ev := range resp.Events {
fmt.Printf("Event: %s, Key: %s, Value: %s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
3.5 租约(Lease)
创建租约
// 创建租约
leaseResp, err := cli.Grant(context.Background(), 10) // TTL 为 10 秒
if err != nil {
log.Fatal(err)
}
// 将键绑定到租约
_, err = cli.Put(context.Background(), "/config/database/host", "localhost", clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Fatal(err)
}
fmt.Println("Key with lease created!")
续约
// 续约
_, err = cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Fatal(err)
}
fmt.Println("Lease kept alive!")
3.6 事务(Txn)
事务操作:
txnResp, err := cli.Txn(context.Background()).
If(clientv3.Compare(clientv3.Value("/config/database/host"), "=", "localhost")).
Then(clientv3.OpPut("/config/database/host", "127.0.0.1")).
Else(clientv3.OpPut("/config/database/host", "unknown")).
Commit()
if err != nil {
log.Fatal(err)
}
fmt.Println("Transaction committed:", txnResp.Succeeded)
3.7 分布式锁
基于 etcd 实现分布式锁:
// 创建锁
lock := clientv3.NewLock(cli, "/mylock")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, err = lock.Lock(ctx)
cancel()
if err != nil {
log.Fatal(err)
}
fmt.Println("Lock acquired!")
// 释放锁
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
err = lock.Unlock(ctx)
cancel()
if err != nil {
log.Fatal(err)
}
fmt.Println("Lock released!")
etcd 的分布式锁基于 租约(Lease)+ 临时键(Key)+ 事务(Txn) 来实现,保证了锁的互斥性和自动过期能力。
1. 核心原理
-
创建租约(Lease)
- 申请一个带有效期的租约,比如 10 秒。
- 这个租约到期后,除非主动续约,否则 etcd 会自动删除绑定的 Key。
-
加锁(创建唯一 Key)
- 通过事务(Txn)检查锁是否存在:
- 不存在:创建一个 Key 并绑定租约,锁获取成功。
- 已存在:锁被占用,获取失败。
- 通过事务(Txn)检查锁是否存在:
-
保持锁(定期续约)
- 客户端定期给租约续期,防止锁过期失效。
- 如果客户端崩溃或超时未续约,etcd 会自动删除 Key,锁被释放。
-
释放锁(删除 Key)
- 任务完成后,主动删除 Key,释放锁。
2. 代码示例
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
// 1. 创建租约(10 秒)
lease := clientv3.NewLease(cli)
leaseResp, _ := lease.Grant(context.TODO(), 10)
leaseID := leaseResp.ID
// 2. 通过事务尝试加锁
key := "/my-lock"
txn := cli.Txn(context.TODO())
txnResp, _ := txn.If(
clientv3.Compare(clientv3.CreateRevision(key), "=", 0), // 确保 Key 不存在
).Then(
clientv3.OpPut(key, "locked", clientv3.WithLease(leaseID)), // 绑定租约
).Commit()
if !txnResp.Succeeded {
fmt.Println("锁已被占用")
return
}
fmt.Println("成功获取锁")
// 3. 续约保持锁
keepAlive, _ := lease.KeepAlive(context.TODO(), leaseID)
go func() {
for ka := range keepAlive {
fmt.Println("续约成功:", ka.TTL)
}
}()
// 4. 业务逻辑
time.Sleep(5 * time.Second)
// 5. 释放锁
cli.Delete(context.TODO(), key)
fmt.Println("锁已释放")
}
3. etcd 分布式锁的优缺点
✅ 强一致性:基于 Raft,保证锁的唯一性和数据安全。
✅ 自动过期:客户端崩溃时,锁会自动释放,避免死锁。
✅ 高可用性:etcd 可集群部署,保证系统稳定。
❌ 性能相对较低:etcd 适合小规模锁,大量并发加锁可能影响性能。
❌ 网络依赖性强:网络分区可能导致锁失效。
https://github.com/0voice