当前位置: 首页 > article >正文

【etcd】etcd_go操作与etcd锁实现原理

etcd 是一个分布式的键值存储系统,提供了丰富的 API 用于操作和管理键值对。etcd 的 API 基于 gRPCHTTP/JSON,支持多种编程语言。以下是 etcd API 的详细讲解,包括核心功能和示例。


1. etcd API 版本

etcd 目前有两个主要的 API 版本:

  • v2 API:早期的 API,基于 HTTP/JSON,功能较为简单。
  • v3 API:当前的推荐版本,基于 gRPC,功能更强大,性能更好。

以下内容主要基于 v3 API


2. etcd v3 API 的核心功能

etcd v3 API 提供了以下核心功能:

2.1 键值操作

  • Put:设置键值对。
  • Get:获取键值对。
  • Delete:删除键值对。
  • Txn:事务操作,支持原子性操作。

2.2 Watch 机制

  • 监听键的变化,当键的值发生变化时触发回调。

2.3 租约(Lease)

  • 为键值对设置租约(TTL),到期后自动删除。

2.4 分布式锁

  • 基于 etcd 的原子操作实现分布式锁。

2.5 集群管理

  • 管理 etcd 集群的成员和状态。

3. etcd v3 API 的使用

3.1 安装 etcd 客户端库

以 Go 语言为例,安装 etcd 客户端库:

go get go.etcd.io/etcd/client/v3

3.2 创建 etcd 客户端

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/client/v3"
)

func main() {
	// 创建 etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"http://127.0.0.1:2379"}, // etcd 地址
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	fmt.Println("Connected to etcd!")
}

3.3 键值操作

Put:设置键值对
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "/config/database/host", "localhost")
cancel()
if err != nil {
	log.Fatal(err)
}
fmt.Println("Put key successfully!")
Get:获取键值对
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "/config/database/host")
cancel()
if err != nil {
	log.Fatal(err)
}
for _, ev := range resp.Kvs {
	fmt.Printf("Key: %s, Value: %s\n", ev.Key, ev.Value)
}
Delete:删除键值对
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
_, err = cli.Delete(ctx, "/config/database/host")
cancel()
if err != nil {
	log.Fatal(err)
}
fmt.Println("Deleted key successfully!")

3.4 Watch 机制

监听键的变化:

watchChan := cli.Watch(context.Background(), "/config/database/host")
for resp := range watchChan {
	for _, ev := range resp.Events {
		fmt.Printf("Event: %s, Key: %s, Value: %s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
	}
}

3.5 租约(Lease)

创建租约
// 创建租约
leaseResp, err := cli.Grant(context.Background(), 10) // TTL 为 10 秒
if err != nil {
	log.Fatal(err)
}

// 将键绑定到租约
_, err = cli.Put(context.Background(), "/config/database/host", "localhost", clientv3.WithLease(leaseResp.ID))
if err != nil {
	log.Fatal(err)
}
fmt.Println("Key with lease created!")
续约
// 续约
_, err = cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
	log.Fatal(err)
}
fmt.Println("Lease kept alive!")

3.6 事务(Txn)

事务操作:

txnResp, err := cli.Txn(context.Background()).
	If(clientv3.Compare(clientv3.Value("/config/database/host"), "=", "localhost")).
	Then(clientv3.OpPut("/config/database/host", "127.0.0.1")).
	Else(clientv3.OpPut("/config/database/host", "unknown")).
	Commit()
if err != nil {
	log.Fatal(err)
}
fmt.Println("Transaction committed:", txnResp.Succeeded)

3.7 分布式锁

基于 etcd 实现分布式锁:

// 创建锁
lock := clientv3.NewLock(cli, "/mylock")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, err = lock.Lock(ctx)
cancel()
if err != nil {
	log.Fatal(err)
}
fmt.Println("Lock acquired!")

// 释放锁
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
err = lock.Unlock(ctx)
cancel()
if err != nil {
	log.Fatal(err)
}
fmt.Println("Lock released!")

etcd 的分布式锁基于 租约(Lease)+ 临时键(Key)+ 事务(Txn) 来实现,保证了锁的互斥性自动过期能力。

1. 核心原理

  1. 创建租约(Lease)

    • 申请一个带有效期的租约,比如 10 秒。
    • 这个租约到期后,除非主动续约,否则 etcd 会自动删除绑定的 Key。
  2. 加锁(创建唯一 Key)

    • 通过事务(Txn)检查锁是否存在:
      • 不存在:创建一个 Key 并绑定租约,锁获取成功。
      • 已存在:锁被占用,获取失败。
  3. 保持锁(定期续约)

    • 客户端定期给租约续期,防止锁过期失效。
    • 如果客户端崩溃或超时未续约,etcd 会自动删除 Key,锁被释放。
  4. 释放锁(删除 Key)

    • 任务完成后,主动删除 Key,释放锁。

2. 代码示例

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/client/v3"
	"time"
)

func main() {
	cli, _ := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})
	defer cli.Close()

	// 1. 创建租约(10 秒)
	lease := clientv3.NewLease(cli)
	leaseResp, _ := lease.Grant(context.TODO(), 10)
	leaseID := leaseResp.ID

	// 2. 通过事务尝试加锁
	key := "/my-lock"
	txn := cli.Txn(context.TODO())
	txnResp, _ := txn.If(
		clientv3.Compare(clientv3.CreateRevision(key), "=", 0), // 确保 Key 不存在
	).Then(
		clientv3.OpPut(key, "locked", clientv3.WithLease(leaseID)), // 绑定租约
	).Commit()

	if !txnResp.Succeeded {
		fmt.Println("锁已被占用")
		return
	}
	fmt.Println("成功获取锁")

	// 3. 续约保持锁
	keepAlive, _ := lease.KeepAlive(context.TODO(), leaseID)
	go func() {
		for ka := range keepAlive {
			fmt.Println("续约成功:", ka.TTL)
		}
	}()

	// 4. 业务逻辑
	time.Sleep(5 * time.Second)

	// 5. 释放锁
	cli.Delete(context.TODO(), key)
	fmt.Println("锁已释放")
}

3. etcd 分布式锁的优缺点

强一致性:基于 Raft,保证锁的唯一性和数据安全。
自动过期:客户端崩溃时,锁会自动释放,避免死锁。
高可用性:etcd 可集群部署,保证系统稳定。

性能相对较低:etcd 适合小规模锁,大量并发加锁可能影响性能。
网络依赖性强:网络分区可能导致锁失效。


https://github.com/0voice


http://www.kler.cn/a/553545.html

相关文章:

  • 通过例子学 rust 个人精简版 3-1
  • 【系统架构】分布式事务模型详解
  • 配置终端代理
  • 精准医疗的“柔性”助力:FPC在医疗机器人中的应用实例【新立电子】
  • 启元世界(Inspir.ai)技术浅析(七):AI Beings 平台
  • AI照片管理利器实战:自动化分类+智能搜索+远程访问一站式解决方案
  • 基于Springboot的公寓报修管理系统【附源码】
  • 用DeepSeek零基础预测《哪吒之魔童闹海》票房——从数据爬取到模型实战
  • 【探索PHP的无限可能:构建高效、动态的Web应用】
  • 机器学习:十大算法实现汇总
  • EasyExcel实现excel导入(模版上传)
  • 处理 Markdown 转换过程中损坏的 PDF 文件
  • 机器学习(1)安装Pytorch
  • 如何排查服务器日志中出现的可疑行为
  • [实现Rpc] 消息类型的测试 | dynamic_pointer_cast | gdb使用
  • 如何安装Hadoop
  • Unity 与 Mosquitto MQTT Broker 通信教程
  • TypeScript 中的 type 和 interface:你真的了解它们的不同吗?
  • React之旅-02 创建项目
  • 使用Druid连接池优化Spring Boot应用中的数据库连接