当前位置: 首页 > article >正文

利用Go语言模拟实现Raft协议

  近来学习到区块链,想要模拟实现 Raft 协议。但是发现网上教程很杂,或者说很多教程并不适合于新手从零开始进行实现。

  本文将从头开始复现个人模拟实现 Raft 的过程,完成后整个模拟后,读者应该学会 Go 语言的基本语法、Rpc 编程的基本概念与用法、简易 Raft 协议的过程。

  系统实现:本地 Raft 节点注册,Raft 节点的投票和选举,心跳监听,超时选举,Http监听,日志复制。

  Tips:本文是按照作者实现过程进行复现,因此如果想要学习可以通篇慢慢读下去,全篇大概1.2W字左右。如果需要直接的代码可以参考以下链接,每个部分都放在不同的文件夹中,理论上可以直接运行。不拒绝白嫖代码,但如果有用请点一个星

Gitee 地址:https://gitee.com/moisten-white/go-raft

GitHub 地址:https://github.com/huang-juanjuan/Go-Raft

文章目录

  • 一、Raft 关键算法
  • 二、创建 Raft 节点组
    • 2.1 Raft 节点的创建
    • 2.2 Rpc 服务注册
    • 2.3 Peers 的构建
    • 2.4 测试主函数
  • 三、投票功能的实现
    • 3.1 阻塞以及心跳检测功能
    • 3.2 判断能否成为 Candidate
    • 3.3 Candidate 进行选举
    • 3.4 节点回应投票
    • 3.5 Candidate 收集投票
    • 3.6 选举定时器
    • 3.7 测试主函数
  • 四、心跳检测
    • 4.1 Leader 发送心跳消息
    • 4.2 Follower 回应心跳消息
    • 4.3 心跳定时器
    • 4.4 测试结果
  • 五、创建日志
    • 5.1 监听 Http
    • 5.2 取消监听
    • 5.3 测试结果
  • 六、日志复制
    • 6.1 Leader 广播新日志
    • 6.2 Leader 广播日志复制命令
    • 6.3 Follower 回应日志复制
    • 6.4 Leader 更改日志复制命令
    • 6.5 Follower 复制日志
    • 6.6 测试结果
  • 七、日志提交
  • 八、小结

一、Raft 关键算法

  本文按照主要 9 个算法模拟实现 Raft 协议。

  这一步很关键,对于第一次尝试的初学者,最开始往往不知道从何处下手,这也是网上很多 Raft 协议模拟的文章的问题。许多文章按照整个 Raft 实现的思路,只有完成所有代码才能知道项目是否完成,这对初学者来说是非常不友好的。

  本文将按照本人实现的过程,分成若干个可以验证的部分进行讲解,读者可以同时完成每个一级标题后进行验证代码的正确性
在这里插入图片描述

二、创建 Raft 节点组

2.1 Raft 节点的创建

  初学者针对 Raft 实现时,应该针对当前需要一步步进行舔砖加瓦。对于 Raft 结构体的元素,一开始包含基本的 ID、Peers、currentRole 等即可,而 log 可以先用 []string 进行代替。log 具体的结构体等内容,后续再进行逐步添加。以下是个人最开始使用的 Raft 结构体,以及如何进行初始化,供参考:

// Raft 结构
type Raft struct {
mu sync.RWMutex // 互斥锁
id string       // 端口号

peers         []string // 所有节点的端口号
currentTerm   int      // 当前 Term 号
currentRole   Role     // 当前角色
currentLeader string   // 当前 Leader 的端口号
Logs          []string // 日志

votedFor     string   // 当前 Term 中给谁投了票
voteReceived []string // 收到的同意投票的端口号

sentLength  map[string]int // 每个节点日志复制的插入点
ackedLength map[string]int // 每个节点已接收的日志长度

electionTimer     *time.Timer  // 选举定时器
heartBeatTimer    *time.Ticker // 心跳计时器
lastHeartBeatTime int64        // 上次收到心跳的时间
timeout           int          // 心跳超时时间
}

// NewRaft 创建并初始化一个新的 Raft 实例
func NewRaft(id string) *Raft {
rf := &Raft{
	id:                id,
	peers:             []string{},
	currentTerm:       0,
	currentRole:       Follower,
	currentLeader:     "null",
	Logs:              []string{},
	votedFor:          "null",
	voteReceived:      []string{},
	sentLength:        make(map[string]int),
	ackedLength:       make(map[string]int),
	electionTimer:     time.NewTimer(time.Duration(rand.Intn(5000)+5000) * time.Millisecond),
	heartBeatTimer:    time.NewTicker(1000 * time.Millisecond),
	lastHeartBeatTime: time.Now().UnixMilli(),
	timeout:           5,
}
return rf
}

  先有了 Raft 的初始化,我们先进行的尝试是创建 Raft 节点,并使若干个节点之间能够相互更新,形成一个群组。在学习过程中,本人了解到的是 Raft 节点有一个上位节点作为中心节点,用于 Term 号的广播和维护 Peers 记录所有节点端口号。

  本人在此的思路是“使用命令行参数,在创建节点的时候指定节点的端口号,并指定中心节点的端口号,新节点通过和指定的中心节点通信创建自身 Peers,中心节点广播通知已有节点添加新节点”。这样便能在中心节点挂掉时,能够继续维护节点组的运行。同时这种方法对于初学者比较友好,省略了构建中心节点的不同属性、方法和接口等。

2.2 Rpc 服务注册

  为了能实现广播等功能,我们需要使用 Rpc 编程,对于初学者来说,这部分也是最难入门的一点。

  首先是如何理解 Rpc 编程,Rpc 全程 Remote Procedure Call,即“远程过程调用”。本人的理解是,把 Raft 节点想象成若干个物理上分散的节点,每个节点就像一个实体类,Rpc 函数就是类的方法,Rpc 就是连接到实体类,通过连接调用实体类的方法

下列代码中 client 就相当于实体类的一个引用,使用 client.call 调用 Raft 节点的 ReplyVote 函数。

client, err := rpc.DialHTTP("tcp", peer)
if err != nil {
	log.Println("Dialing error: ", err)
	rf.mu.Lock()
	rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
	rf.mu.Unlock()
		return
}
var reply RequestVoteReply
err = client.Call("Raft.ReplyVote", args, &reply)
if err != nil {
	log.Println("RPC error: ", err)
	return
}

  本人在注册 Rpc 时也经历了非常长的时间去理解,此处借鉴了另一个人的用法,同时注册了 Http 请求的监听,便于之后的日志的创建。
借鉴地址:https://github.com/corgi-kx/blockchain_consensus_algorithm/tree/master/raft

// startRaftServer 启动一个基于 HTTP 的 RPC 服务器,用于处理 Raft 节点之间的 RPC 请求。
func startRaftServer(rf *Raft, port string) {
	// 将 Raft 实例 rf 注册为一个 RPC 服务,以便其他节点可以通过 RPC 调用 rf 的方法。
	err := rpc.Register(rf)
	if err != nil {
		log.Panic(err)
	}

	// 将 RPC 服务绑定到 HTTP 协议上,允许通过 HTTP 协议处理 RPC 调用。
	// 这会为 RPC 服务创建一个 "/rpc" 端点,并监听来自 HTTP 请求的 RPC 调用。
	rpc.HandleHTTP()

	// 在指定的端口上启动 TCP 监听器,以便接收来自其他 Raft 节点的连接请求。
	listener, err := net.Listen("tcp", ":"+port)
	if err != nil {
		log.Fatalf("Error starting server: %s", err)
	}

	// 使用 HTTP 服务器处理通过监听器接收到的连接。
	// nil 代表使用默认的 HTTP 请求多路复用器 (http.DefaultServeMux) 来分发请求。
	// 这个操作在一个新的 Go routine 中执行,以便服务器可以并行处理多个请求。
	go http.Serve(listener, nil)
}

  此处在讲解一下 go function() 的用法,本人的理解就是将 go routine 当做一个线程,go function() 就是启动了一个 function() 函数作为线程运行,此时原本调用 go function() 的函数即使退出也不影响 function() 的运行,除非整个程序的终止或满足退出条件。还可以通过信号机制进行管理,但是此项目并未使用。

2.3 Peers 的构建

  这一小节是这部分最关键的内容,即对于一个已有的节点组,新节点如何加入这个群组。按照上文提到的方法,关键在于设置一个中心节点,让新节点通过中心节点得到已有节点,再通过中心节点让所有已有节点能更新新节点。

  为此我们需要两个函数,一个用于已有节点进行更新,一个用于新节点的注册。

  首先是已有节点进行更新,即添加一个新节点的端口号,在本人的 Raft 结构体中可以看到 Peers 是一个 string 数组用于记录端口号,添加新节点的过程其实类似于数组的添加,但是需要注意的是避免重复添加的过程,因此实际上可以利用 map 进行记录

以下为添加节点的 Rpc 函数,对于 Rpc 函数我们需要注意的是,函数名前面的 (rf *Raft),表示这是一个 Raft 的函数。Rpc 函数的返回值也必须为 error,其参数也必须为两个指针类型,一个用于传参,一个可以用于记录返回值,因此通常将两个参数用结构体进行包装

// 定义请求和响应结构
type AddPeerArgs struct {
// 新节点的端口号
	NewPeer string
}

type AddPeerReply struct {
// 更新后的新群组 peers
	Peers []string
}

// Rpc 函数用于广播通知已有节点添加新节点
func (rf *Raft) AddPeer(args *AddPeerArgs, reply *AddPeerReply) error {
// 先对 rf 上锁,避免出现广播中 peers 改变导致出现重复添加的情况
	rf.mu.Lock()
// defer 表示函数退出时再执行解锁操作
	defer rf.mu.Unlock()

	// 遍历 peers 检查是否已经存在该 peer
	for _, peer := range rf.peers {
		if peer == args.NewPeer {
			fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)
			reply.Peers = rf.peers
			return nil
		}
	}

	// 如果不存在则添加
	rf.peers = append(rf.peers, args.NewPeer)
	reply.Peers = rf.peers

	// 打印更新后的 peers 列表,此处可以作为验证性打印,检查是否正确更新节点
	fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)
	return nil
}

  以下为新节点的注册函数,新节点将端口号包装后,调用中心节点的该函数进行注册,通过 reply 将已有的所有节点告知新节点进行注册,再将添加了新节点的端口号广播给已有节点进行更新。

// 该方法会将新节点添加到当前节点的 peers 列表中,并将新节点信息广播给其他节点。
func (rf *Raft) RegisterNode(args *AddPeerArgs, reply *AddPeerReply) error {
	rf.mu.Lock()  
	defer rf.mu.Unlock()  

	// 检查是否已经存在该 peer
	for _, peer := range rf.peers {
		if peer == args.NewPeer {
			fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)
			reply.Peers = rf.peers
			return nil
		}
	}
	// 如果不存在则添加
	rf.peers = append(rf.peers, args.NewPeer)
	// 打印当前节点的 peers 列表,用作调试
	fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)


	// 将新节点信息广播给当前节点的其他 peers
	for _, peer := range rf.peers {
		// 跳过新节点本身和当前节点
		if peer == args.NewPeer || peer == "localhost:"+rf.id {
			continue
		}
  
		// 尝试通过 RPC 连接到其他节点
		client, err := rpc.DialHTTP("tcp", peer)
		if err != nil {
			log.Println("Dialing:", err)  // 如果连接失败,记录错误并继续下一个节点
			continue
		}
		// 准备将新节点添加到其他节点的 peers 列表中
		addPeerArgs := &AddPeerArgs{NewPeer: args.NewPeer}
		var addPeerReply AddPeerReply
		// 调用其他节点的 AddPeer 方法,将新节点信息发送过去
		err = client.Call("Raft.AddPeer", addPeerArgs, &addPeerReply)
		if err != nil {
			log.Println("RPC error:", err)
		}
	}

	return nil  // 方法执行成功,返回 nil 表示没有错误
}

2.4 测试主函数

  在完成上面三个小节之后,我们的系统应该能基本做到构建一个 Raft 节点组,能实现新节点的添加,这部分可以通过如下的主函数进行测试。需要注意的是,在广播新节点端口号时,在端口号前添加了 “localhost:”,使本地节点上可以相互进行连接通信。

func main() {
	// 定义命令行参数:端口号和中心节点的地址
// flag 使用 import 导入
	portPtr := flag.String("port", "0", "端口号")
	centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")
	flag.Parse()  // 解析命令行参数

	// 检查是否提供了有效的端口号,如果端口号为 "0",程序退出
	if *portPtr == "0" {
		fmt.Println("请确认端口号")
		os.Exit(1)
	}

	// 创建一个新的 Raft 实例,使用提供的端口号
	rf := NewRaft(*portPtr)

	// 启动 Raft 服务器,监听 RPC 请求
	go startRaftServer(rf, *portPtr)

	// 尝试通过 RPC 连接到中心节点
	client, err := rpc.DialHTTP("tcp", *centralNodePtr)
	if err != nil {
		log.Fatalf("Error connecting to central node: %s", err)  // 如果连接失败,打印错误信息并退出程序
	}

	// 创建请求参数,将当前节点的信息传递给中心节点
	args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}
	var reply AddPeerReply

	// 调用中心节点的 RegisterNode 方法,将当前节点注册到集群中
	err = client.Call("Raft.RegisterNode", args, &reply)
	if err != nil {
		log.Fatalf("RPC error: %s", err)  // 如果调用失败,打印错误信息并退出程序
	}

	// 更新当前节点的 peers 列表,使用从中心节点获取的 peers 列表
	rf.mu.Lock()
	rf.peers = reply.Peers
	fmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)
	rf.mu.Unlock()

	// 阻塞 main 函数,防止程序退出
	select {}
}

  在文件目录下使用 go build -o test.exe 后生成可执行文件,调用验证 Peers 能否正确更新,命令行参数的使用,以及是否会出现重复添加等情况。

在这里插入图片描述

三、投票功能的实现

  在这部分,我们将实现基本的投票功能。当然,由于心跳检测功能还不够完善,所以还不能算是一次完整的选举过程,但为了尽量每一部分都可以验证,本节还是非常有必要的。

3.1 阻塞以及心跳检测功能

  在投票功能中,一个 Follower 首先需要通过变成 Candidate 进行收集投票才能进一步变为 Leader,因此我们需要考虑什么情况下变成 Candidate。

在这里插入图片描述

  因此我们需要修改主函数,让 Raft 节点在创立后循环判断能否成为 Candidate;如果不能,是否收到心跳消息,这部分的伪代码如下:

// 循环的标签
Circle:
	for {
  if rf 能否变为 Candidate {
			// 成为候选人节点后,向其他节点请求选票进行选举
			rf 开始进行选举

			rf.mu.Lock()
			if rf 选举成功 {
				fmt.Printf("Node %s has become the leader\n", rf.id)
				rf.mu.Unlock()
				break
			} else {
				// 选举过程中失败,此时可能是已有更新的 Term 或是选举超时
	            // 如果出现更新的 Term,则下一次判断能否成为 Candidate 时退出循环
          // 如果选举超时或群组只有当前节点一个节点,则重复进行选举
				rf.mu.Unlock()
			}
		} else {
			// 变为 Candidate 失败
			break
		}
	}

	// 进行心跳检测
	for {
		// 5秒检测一次
		time.Sleep(5 second)
		rf.mu.Lock()

		if rf 不是领导者 && rf 收到过心跳消息 && 上一次心跳消息距离现在已超时 {
			fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)
			fmt.Println("即将重新开启选举")
			// 重新初始化 Raft 节点信息,准备变为 Candidate
   	goto Circle
		} else {
			rf.mu.Unlock()
		}
	}

3.2 判断能否成为 Candidate

  在上一小节中我们提到如何判断能否成为 Candidate,为此我们需要一个专门的函数进行处理。在这一步,我们需要使用随机退避原则处理一种会发生活锁现象的情况

  假设现有两个节点A、B,A 启动后变为 Candidate,随后 B 启动后变为 Candidate。由于开始收集投票时,节点首先会为自己投票,因此 A.VoteFor = A,B.VoteFor = B。对于A 发起的选举,B 已给自己投过票,因此拒绝投票,同理 A 也因此拒绝给 B 投票。于是 A、B 在成为 Candidate 的过程中都失败了,相同时间的心跳检测后,上述过程循环进行,也就产生了活锁。

  为解决矛盾,应该设置选举前,随机休眠一段时间然后重新开始选举,避免term号不停更新。此处需要注意的是休眠随机时间的位置,不可放在上锁之后。原因在于之后收集投票,为了保证安全也是要对数据上锁保护的,如果此时在上锁后进行休眠,则之后收集投票会被阻塞,导致收集投票时,节点已经成为 Candidate 了,休眠因此是无意义的。

// 修改节点为候选人状态,返回值不是 error,说明函数可以类比为 Raft 类的成员函数
// 可以使用rf.becomCandidate() 进行调用,可以访问 rf 结构体的所有成员
func (rf *Raft) becomeCandidate() bool {
	//休眠随机时间后,再开始成为候选人
	r := rand.Int63n(3000) + 1000
	time.Sleep(time.Duration(r) * time.Millisecond)

	rf.mu.Lock()
	defer rf.mu.Unlock()

// 只能从 Follwer 变为 Candidate,并且当前 Term 中没有 Leader 且节点未投过票
	if rf.currentRole == Follower && rf.currentLeader == "null" && rf.votedFor == "null" {
		rf.currentRole = Candidate
		rf.votedFor = rf.id
		rf.currentTerm += 1
		rf.voteReceived = append(rf.voteReceived, "localhost:"+rf.id)
		fmt.Println("本节点已变更为候选人状态")
		return true
	} else {
		return false
	}
}

3.3 Candidate 进行选举

  当节点从 Follower 变为 Candidate 后,即将开始进行选举。这个过程要对当前 Peers 中的所有节点收集投票,每收集一个节点的结果,就判断是否需要进行退出选举过程。可以看到

在这里我们需要使用一个锁的组进行控制,确保先完成选举投票过程,再进行下一步的过程。

// 收集投票信息结构体
type RequestVoteArgs struct {
	NodeId       string // 候选人的ID
	Term         int    // 发起请求节点的 Term
	LastLogIndex int    // 最新的日志索引
	LastLogTerm  int    // 最新日志的 Term
}

type RequestVoteReply struct {
	NodeId      string // Follower 的 ID
	Term        int    // 响应节点的 Term
	VoteGranted bool   // 是否同意投票
}

func (rf *Raft) startElection(args *RequestVoteArgs) {
	// 开始选举时,同时开始选举定时器
	rf.resetElectionTimer()
	fmt.Print("开始选举Leader\n")

	var wg sync.WaitGroup
	var mu sync.Mutex

	// 决定选举是否中断
	cancelled := false

	for i := 0; i < len(rf.peers); i++ {
		peer := rf.peers[i]
		if peer == "localhost:"+rf.id {
			continue
		}
  // 添加一把锁
		wg.Add(1)
		go func(peer string, index int) {
			// 退出时解开一把锁
      defer wg.Done()
			
      // 遇到更新的 Term 时,节点会从 Candidate 退出,此时结束选举
			if rf.currentRole != Candidate {
				return
			}
      // 调试信息打印
			fmt.Printf("节点 %s 向 %s 发起投票请求\n", rf.id, peer)

      // 连接到远程 Raft 节点
			client, err := rpc.DialHTTP("tcp", peer)
			if err != nil {
				log.Println("Dialing error: ", err)
				rf.mu.Lock()
          // 如果连接失败则从 Peers 中删除对应 peer
				rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
				rf.mu.Unlock()
				return
			}
			var reply RequestVoteReply
      // 远程调用节点回应是否投票
			err = client.Call("Raft.ReplyVote", args, &reply)
			if err != nil {
				log.Println("RPC error: ", err)
				return
			}

      // 收到回应后进行收集投票的判断
			if rf.CollectVotes(&reply) {
				mu.Lock()
          // 当收集到过半票数时返回为 true,此时修改 cancelled 为 true 进行退出
				cancelled = true
				mu.Unlock()
				return
			} else {
				rf.mu.Lock()
          // 如果节点变为 Follower 说明已有更新的 Term,此时退出选举
				if rf.currentRole == Follower {
					fmt.Println("结束收集投票")
					rf.mu.Unlock()
					return
				} else {
              // 继续收集投票,此时不一定收集完了投票,所以不需要退出
					rf.mu.Unlock()
				}
			}
		}(peer, i)
	}

// 等待所有的锁都解开,即收集了所有的投票
// 收集所有投票也是为了避免收集了部分投票,但是余下的节点中存在更新的 Term
	wg.Wait()

	rf.mu.Lock()
	defer rf.mu.Unlock()
	// 无论是否选举成功都结束选举定时器
	rf.electionTimer.Stop()

	if cancelled {
		fmt.Println("选举成功")
		for _, follower := range rf.peers {
			if follower == "localhost:"+rf.id {
				continue
			}
			rf.sentLength[follower] = len(rf.Logs)
			rf.ackedLength[follower] = 0
			fmt.Println("replicatedLogs", follower)
		}
	} else {
		fmt.Println("选举失败")
		
  rf.currentRole = Follower
  // 如果选举中发现更新的 Term,则会立刻修改 currentLeader 和 votedFor
  // 这种情况下是不需要重新初始化当前节点的,否则会有更新的 Term,但是当前节点拒绝投票
  // 而后当前节点在下一轮又重新进入选举,重复上述过程,导致始终无法出现 Leader,出现活锁
		if rf.currentLeader == rf.id {
			rf.currentLeader = "null"
		}
		if rf.votedFor == rf.id {
			rf.votedFor = "null"
		}
		rf.voteReceived = []string{}
	}
}

3.4 节点回应投票

  在上一小节我们远程调用了 Peers 中节点的 ReplyVote 函数,相当于 Raft 算法中的 Raft-2,从这里我们开始将用到 log 结构体。

// log 结构体
type LogEntry struct {
	Term    int // 日志生成时的 Term号
	Index   int // 日志的序号
	Command string // 日志的内容
}

// Raf-2
func (rf *Raft) ReplyVote(args *RequestVoteArgs, reply *RequestVoteReply) error {
	rf.mu.Lock()  
	defer rf.mu.Unlock() 

	// 打印消息,表示收到来自其他节点的投票请求。
	fmt.Printf("节点 %s 收到来自 %s 的投票请求\n", rf.id, args.NodeId)

	// 确定当前节点最后一条日志的任期号。
	myLogTerm := 0
	if len(rf.Logs) > 0 {
		myLogTerm = rf.Logs[len(rf.Logs)-1].Term
	}

	// 检查候选节点的日志是否比当前节点的日志更新。
	// 这是为了确保候选节点的日志至少与当前节点的日志一样完整。
	logOK := args.LastLogTerm > myLogTerm || (args.LastLogTerm == myLogTerm && args.LastLogIndex >= len(rf.Logs))

	// 检查候选节点的任期是否大于当前节点的任期
// 或者如果它们相等,但当前节点没有投票给其他候选人或已经投票给该候选节点。
	termOK := args.Term > rf.currentTerm || (args.Term == rf.currentTerm && (rf.votedFor == "null" || rf.votedFor == args.NodeId))

	// 如果日志和任期条件都满足,则授予投票。
	if termOK && logOK {
		rf.currentTerm = args.Term  // 将当前节点的任期更新为候选节点的任期。
		rf.currentRole = Follower  // 将当前节点的角色更改为跟随者。
		rf.votedFor = args.NodeId  // 记录候选节点的 ID 作为当前节点投票的对象。
		fmt.Print("接受投票\n")
		reply.VoteGranted = true  // 在回复中设置投票授予标志为 true。
	} else {
		fmt.Print("拒绝投票\n")
		reply.VoteGranted = false  // 在回复中设置投票授予标志为 false。
	}

	// 将回复的 NodeId 和 Term 设置为当前节点的 ID 和任期。
	reply.NodeId = rf.id
	reply.Term = rf.currentTerm

	return nil
}

3.5 Candidate 收集投票

  这部分和 3.3 节不同,3.3 节管理的是选举的过程,本小节则关注于收到 3.4 节的回应后,Candidate 进行统计处理的过程。在 Raft 关键算法中则对应于 Raft-3,需要注意的是在下列函数中,Candidate 有可能会变为 Follower,并修改了 votedFor。这也就是 3.3 节中的 startElection() 函数中,为什么要在选举失败后添加两个 if 语句的原因。

// Raft-3
func (rf *Raft) CollectVotes(reply *RequestVoteReply) bool {
	rf.mu.Lock()  
	defer rf.mu.Unlock() 

	// 检查当前节点是否仍是候选者,并且收到的投票回复的任期与当前节点的任期一致且授予了投票。
	if rf.currentRole == Candidate && reply.Term == rf.currentTerm && reply.VoteGranted {
		// 将投票给当前节点的节点ID添加到已收到的投票列表中。
		rf.voteReceived = append(rf.voteReceived, reply.NodeId)
		
		// 如果收到的投票数量超过总节点数的一半,当前节点将成为领导者。
		if len(rf.voteReceived) > len(rf.peers)/2 {
			rf.currentRole = Leader  // 将当前节点的角色设置为领导者。
			rf.currentLeader = rf.id  // 设置当前节点为领导者。
			fmt.Println("已获得超过二分之一票数")
			return true  // 返回 true 表示当前节点已成为领导者。
		}
	} else if reply.Term > rf.currentTerm {
		// 如果收到的投票回复包含比当前节点更高的任期,则当前节点放弃竞选,变为跟随者。
		fmt.Println("存在更新的 Term")
		rf.currentTerm = reply.Term  // 更新当前节点的任期。
		rf.currentRole = Follower  // 将当前节点的角色更改为跟随者。
		rf.votedFor = "null"  // 清除当前节点的投票记录。

		return false // 这里的返回是因为选举失败导致
	}
// 这里的返回是因为选举的票数未达到
// 或是在另一个 goroutine 中变为了 Follower
	return false 
}

3.6 选举定时器

  首先说明,在本项目中,这部分难以验证,原因在于无法模拟网络拥塞等情况,即连接上一个 Raft 节点后,选举时间非常快,也无法阻塞一个节点,如果取消一个节点的运行,那么 Candidate 在无法连接上远程节点后,就会从本地 Peers 中删掉对应的 peer。因此本小节仅供参考 Raft-1 算法。

// Raft-1
func (rf *Raft) initRaft() {
	rf.mu.Lock() 
	defer rf.mu.Unlock()

	// 增加当前的任期号,表示进入新的选举周期。
	rf.currentTerm += 1
	// 将当前节点的角色设置为候选者,以便发起选举。
	rf.currentRole = Candidate
	// 将当前节点的 ID 设置为已投票的对象,表示为自己投票。
	rf.votedFor = rf.id
	// 重置收到的选票列表,仅包含当前节点自己的一票。
	rf.voteReceived = []string{rf.id}
}

// 启动选举超时计时器,并在选举超时的情况下重新初始化 Raft 状态。
func (rf *Raft) electionTimerStart() {
	// 监听选举计时器的触发信号,每当计时器触发时进入循环。
	for range rf.electionTimer.C {
		rf.mu.Lock()
		// 检查当前节点的角色是否不是候选者,如果不是,解锁互斥锁并跳过此次循环。
		if rf.currentRole != Candidate {
			rf.mu.Unlock()  
			continue  // 跳过此次循环,等待下一次计时器触发。
		}

		// 如果当前节点是候选者且计时器触发,表示选举超时。
		fmt.Println("选举超时")
		rf.mu.Unlock() 
		// 重新初始化 Raft 状态,以准备再次发起选举。
		rf.initRaft()
	}
}

// 重置选举超时计时器,使其在随机时间后再次触发。
func (rf *Raft) resetElectionTimer() {
	rf.mu.Lock()  // 锁定互斥锁,以确保对共享状态的线程安全访问。
	defer rf.mu.Unlock()  // 在函数结束时自动解锁互斥锁。
	// 生成一个随机的超时时间,范围在 5 到 10 秒之间。
	timeout := time.Duration(rand.Intn(5000)+5000) * time.Millisecond
	// 停止当前的选举计时器。
	rf.electionTimer.Stop()
	// 使用新的超时时间重置选举计时器。
	rf.electionTimer.Reset(timeout)
}

3.7 测试主函数

  完成上面的内容后,我们将 3.1 的伪代码进行填充,得到如下主函数,此时启动程序,我们可以基本实现投票选举的过程,对于心跳检测我们将在下一节中进行实现。

func main() {
	portPtr := flag.String("port", "0", "端口号")
	centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")
	flag.Parse()

	if *portPtr == "0" {
		fmt.Println("请确认端口号")
		os.Exit(1)
	}

	rf := NewRaft(*portPtr)
	go startRaftServer(rf, *portPtr)

	client, err := rpc.DialHTTP("tcp", *centralNodePtr)
	if err != nil {
		log.Fatalf("Error connecting to central node: %s", err)
	}
	args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}
	var reply AddPeerReply
	err = client.Call("Raft.RegisterNode", args, &reply)
	if err != nil {
		log.Fatalf("RPC error: %s", err)
	}

	rf.mu.Lock()
	rf.peers = reply.Peers
	fmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)
	rf.mu.Unlock()

	go rf.electionTimerStart()
	rf.electionTimer.Stop()

Circle:
	for {
		if rf.becomeCandidate() {
			// 成为候选人节点后,向其他节点请求选票进行选举
			rf.mu.Lock()
			args := &RequestVoteArgs{
				Term:         rf.currentTerm,
				NodeId:       rf.id,
				LastLogIndex: len(rf.Logs),
				LastLogTerm:  0,
			}
			if len(rf.Logs) > 0 {
				args.LastLogTerm = rf.Logs[len(rf.Logs)-1].Term
			}
			rf.mu.Unlock()

			rf.startElection(args)

			rf.mu.Lock()
			if rf.currentRole == Leader {
				fmt.Printf("Node %s has become the leader\n", rf.id)
				rf.mu.Unlock()
				break
			} else {
				rf.mu.Unlock()
			}
		} else {
			break
		}
	}

	// 进行心跳检测
	for {
		// 5秒检测一次
		time.Sleep(time.Millisecond * 5000)
		rf.mu.Lock()

		if rf.currentRole != Leader && rf.lastHeartBeatTime != 0 && (time.Now().UnixMilli()-rf.lastHeartBeatTime) > int64(rf.timeout*1000) {
			fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)
			fmt.Println("即将重新开启选举")
			rf.currentRole = Follower
			rf.currentLeader = "null"
			rf.votedFor = "null"
			rf.voteReceived = []string{}
			rf.lastHeartBeatTime = 0
			rf.mu.Unlock()
			goto Circle
		} else {
			rf.mu.Unlock()
		}
	}
}

在这里插入图片描述

四、心跳检测

  在第三节最后的测试中我们很明显发现,即时一个节点选举成功了,其他节点由于接收不到心跳消息,所以过一段时间后也会开始重新进行选举。在本节,我们将晚上心跳信息的发送和检测,实现完整的选举成功过程。

4.1 Leader 发送心跳消息

  心跳消息的目的在于 Leader 告知 Follower 自身的存在,并更新 Term,同时监听是否出现了更新 Term 的 Leader,保证系统中只存在一个 Leader。显然,告知其他节点并收到回应是需要 Rpc 调用的,故设计函数如下。

// Leader 发送的心跳消息结构体
type HeartBeatArgs struct {
	Term     int    // 领导者的任期
	LeaderId string // 领导者的ID
}

// Follower 回应心跳消息的结构体
type HeartBeatReply struct {
	Term    int  // 跟随者的当前任期
	Success bool // 心跳回应
}

func (rf *Raft) SendHeartBeat() {
 rf.mu.Lock()

 fmt.Println(rf.id + " start heartBeat")
 // 如果当前节点不是领导者,则退出心跳发送
 // 例如收到了 A1 节点的消息,发现有更新的 Term,于是在给 A2 节点发送消息前变为了 Follower
 if rf.currentRole != Leader {
     // 设置心跳时间标志不为0,如果节点经历过主函数中的心跳监听超时,则 lastHeartBeatTime = 0
     // 而当前 Leader 变为 Follower 之后,可能在未收到心跳消息前,其他的 Leader 就挂掉了
     // 主函数中限制了 lastHeartBeatTime != 0 才能重新选举,导致本节点始终处于监听心跳状态
     rf.lastHeartBeatTime = 1 
     rf.mu.Unlock()            // 解锁
     return
 }

 // 由于 Leader 在无法连接到指定 peer 时,会将其从本地 Peers 删除
 // 因此如果当前节点是集群中唯一的节点,降级为跟随者
 if len(rf.peers) == 1 {
     rf.lastHeartBeatTime = 1 // 更新心跳时间标志
     rf.currentLeader = "null" // 清空当前领导者
     rf.votedFor = "null"      // 清空投票记录
     rf.currentRole = Follower // 降级为跟随者角色
     rf.mu.Unlock()            // 解锁
     return
 }

 rf.mu.Unlock() // 解锁,准备并行处理心跳发送

 // 构造心跳请求参数
 args := &HeartBeatArgs{
     Term:     rf.currentTerm, // 当前任期
     LeaderId: rf.id,          // 领导者ID
 }

 // 遍历所有的节点,向它们发送心跳
 for i := 0; i < len(rf.peers); i++ {
     peer := rf.peers[i]
     if peer == "localhost:"+rf.id {
         // 跳过当前节点本身
         continue
     }

     // 验证 Leader 给谁发送了心跳消息
     fmt.Printf("%s send heartbeat to %s\n", rf.id, peer)

     // 使用并行的goroutine来发送心跳,以提高效率
     go func(peer string, index int) {
         client, err := rpc.DialHTTP("tcp", peer) // 建立与目标节点的RPC连接
         if err != nil {
             log.Println("Dialing error:", err) // 连接错误处理
             rf.mu.Lock()
             // 如果连接失败,从节点列表中移除该节点
             rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
             rf.mu.Unlock()
             return
         }

         var reply HeartBeatReply
         // 发送心跳消息,并等待回复
         err = client.Call("Raft.ReceiveHeartBeat", args, &reply)
         if err != nil {
             log.Println("RPC error:", err) 
             return
         }

         rf.mu.Lock()
         defer rf.mu.Unlock()

         // 如果收到的回复任期大于当前节点的任期,降级为跟随者
         if reply.Term > rf.currentTerm {
             rf.currentTerm = reply.Term   // 更新当前任期
             rf.currentRole = Follower     // 降级为跟随者角色
             rf.votedFor = "null"          // 清空投票记录
             rf.currentLeader = "null"     // 清空当前领导者
         }
     }(peer, i)
 }
}

4.2 Follower 回应心跳消息

Follower 的回应则相对简单,只需要更新相应信息并把相关信息回应即可。

func (rf *Raft) ReceiveHeartBeat(args *HeartBeatArgs, reply *HeartBeatReply) error {
 rf.mu.Lock()
 defer rf.mu.Unlock()

 // 如果接收到的心跳消息中的任期小于当前节点的任期,拒绝心跳
 if args.Term < rf.currentTerm {
     reply.Term = rf.currentTerm // 返回当前节点的任期
     reply.Success = false       // 标记心跳处理失败
     return nil                  // 直接返回,不进行进一步处理
 }

 // 打印接收到心跳消息的日志信息
 fmt.Printf("节点 %s 收到来自 %s 的心跳消息\n", rf.id, args.LeaderId)

 // 更新节点的状态为跟随者,并同步心跳消息中的任期和领导者信息
 rf.currentTerm = args.Term          // 更新当前节点的任期为心跳消息中的任期
 rf.currentLeader = args.LeaderId    // 更新当前领导者为心跳消息中的领导者ID
 rf.currentRole = Follower           // 将当前节点角色设置为跟随者
 rf.lastHeartBeatTime = time.Now().UnixMilli() // 记录接收到心跳的时间戳

 // 设置回复消息的内容,表示心跳消息处理成功
 reply.Term = rf.currentTerm // 返回当前节点的任期
 reply.Success = true        // 标记心跳处理成功

 return nil
}

4.3 心跳定时器

  对于 Leader 而言,需要定时发送心跳消息,为此设置一个定时器,每间隔一段时间,检测该节点是否为 Leader,如果是则开始发送心跳消息,需要注意的是此处的判断间隔需要小于之前在主函数中设置的时间,完成函数后,在主函数创建节点后,将心跳定时器启动为 goroutine 即可。

// 心跳定时器
func (rf *Raft) heartBeatTimerStart() {
 // 通过循环不断地监听心跳定时器
 for range rf.heartBeatTimer.C {
     rf.mu.Lock() 

     // 如果当前节点的角色不是领导者,则跳过本次心跳发送
     if rf.currentRole != Leader {
         rf.mu.Unlock() // 解锁
         continue        // 跳过当前循环,继续监听下一次定时器触发
     }

     rf.mu.Unlock() // 如果是领导者,解锁并继续进行心跳发送
     rf.SendHeartBeat() // 调用发送心跳消息的函数
 }
}

4.4 测试结果

在主函数中 go rf.heartBeatTimerStart() 放在 go rf.electionTimerStart() 之上即可,此外主函数无其他改动,在此给出测试结果图。

在这里插入图片描述

五、创建日志

5.1 监听 Http

  在 2.2 节中,我们提到在注册服务时,我们还注册了 Http 的监听。因此我们在 Raft 结构体中再添加一个 http.serve 元素,于是对于日志消息,我们可以通过 Http 发送一个字符串模拟日志的命令。同样需要注意的是,Http 的监听只能注册一次,因此设置一个标记符号用于标记,避免重复注册

// getRequest 处理 HTTP 请求的回调函数
func (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {
 // 解析请求的表单数据
 request.ParseForm()

 // 检查请求的 URL 是否包含 "message" 参数,并且当前有领导者节点
 // 例如,http://localhost:8080/req?message=ohmygod
 if len(request.Form["message"]) > 0 && rf.currentLeader != "null" {
     message := request.Form["message"][0] // 提取消息内容
     m := LogEntry{
         Command: message, // 创建一个日志条目
     }
     // 添加到本地日志
		rf.Logs = append(rf.Logs, m)

     fmt.Println("http监听到了消息")
     writer.Write([]byte("ok!!!")) // 向客户端返回确认消息
 }
}

// HandleFuncFlag 确保 http.HandleFunc 只被调用一次的标志变量
var HandleFuncFlag = true

// httpListen 启动 HTTP 服务器监听指定端口
func (rf *Raft) httpListen() {
 // 创建一个 http.Server 实例,监听端口 8080
 rf.server = &http.Server{
     Addr:    ":8080", // 服务器监听的地址和端口
     Handler: nil,     // 使用默认的 http.DefaultServeMux 处理请求
 }

 // 如果 HandleFuncFlag 为 true,设置处理函数
 if HandleFuncFlag {
     HandleFuncFlag = false // 设置标志,确保此代码块只执行一次
     http.HandleFunc("/req", rf.getRequest) // 注册路径 "/req" 的处理函数
 }

 // 启动 HTTP 服务器,使用 goroutine 以非阻塞方式运行
 go func() {
     fmt.Println("监听8080")
     if err := rf.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
         fmt.Println("Server error:", err) // 如果服务器出错且不是正常关闭,记录错误
     }
 }()
}

5.2 取消监听

  我们知道日志是由 Leader 进行广播复制的,因此日志只能由 Leader 产生,所以我们针对上面的 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。需要注意的是,取消监听后服务依旧存在,因此上一小节中需要设置一个标志符号避免重复注册服务

// stopListening 停止 HTTP 服务器的监听
func (rf *Raft) stopListening() {
 // 创建一个带有 1 秒超时的上下文,用于控制服务器的关闭操作,避免被阻塞
 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
 defer cancel() // 在函数退出时自动取消上下文

 // 尝试关闭服务器,传入上下文控制关闭的超时时间
 if err := rf.server.Shutdown(ctx); err != nil {
     // 如果关闭过程中发生错误,输出错误信息
     fmt.Println("Shutdown error:", err)
 } else {
     // 如果关闭成功,输出提示信息
     fmt.Println("监听已停止")
 }
}

在这里插入图片描述

5.3 测试结果

  我们提到 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。所以我们启动监听应该设置在 starElection() 函数中选举成功后。而一个节点如果要从 Leader 状态退出,那么只能在收到心跳消息的回应时发现存在更新的 Term 才会退出,所以我们要在此设置取消监听。

  完成上述内容后,启动程序,当我们在浏览器输入“http://localhost:8080/req?message=第一条日志”时,可以看到当前 Leader 的日志多出一条 “第一条日志”,但是 Follower 并未进行监听。结束进程后,新的 Leader 将重新开始监听,结果如下。

图6

六、日志复制

  这部分内容,我们将实现完整的日志复制,包括 Leader 收到新消息后进行广播,新节点加入后如何复制日志等,这部分的难点在于 Raft-5、Raft-6、Raft-8 之间的循环调用

6.1 Leader 广播新日志

  在上一大节中,我们已经完成了 Leader 监听 http 获取新日志,但是只保存到了本地,因此我们需要使用 Rpc 调用其他 Follower 也能及时更新本地日志,这里需要将原本 getRequest() 函数中本地日志添加的代码换成以 goroutine 形式调用下列函数。

// Raft-4
func (rf *Raft) Boradcast(newLog LogEntry) {
	rf.mu.Lock()

	// 如果当前节点是领导者
	if rf.currentRole == Leader {
		// 设置日志条目的任期和索引,并将其添加到日志中
		newLog.Term = rf.currentTerm
		newLog.Index = len(rf.Logs) + 1
		rf.Logs = append(rf.Logs, newLog)
		rf.mu.Unlock()

		// 异步地将日志复制给其他所有节点
		for i := 0; i < len(rf.peers); i++ {
			peer := rf.peers[i]
			// 跳过自己节点的复制
			if peer == "localhost:"+rf.id {
				continue
			}
			// 启动协程复制日志到其他节点
			go rf.Replicating(peer)
		}
	} else {
		rf.mu.Unlock()
	}
}

6.2 Leader 广播日志复制命令

  这一小节为 Raft-5,是由 Leader 广播日志日志的命令,通过询问 Follower,逐步确定每个 Follower 需要复制的日志,这部分内容关键点在于修改 sentLength 来确定日志复制的节点。同时为了后续日志复制完成后的提交,在 Raft 结构体中再添加一个 CommitLength 元素。

// Leader 发起日志复制命令
type LogRequestArgs struct {
	LeaderId     string     // Leader 的 Id
	CommitLength int        // Leader 已经提交的日志
	Term         int        // Leader 当前 Term 号
	LogLength    int        // 日志长度
	LogTerm      int        // 日志复制点的 Term
	Entries      []LogEntry // 日志列表
}

// Follower 回应命令复制
type LogReplyArgs struct {
	NodeId      string // Follower 的 Id
	CureentTerm int    // Foller 当前的 Term
	Ack         int    // 接收复制后的日志长度
	Flag        bool   // 是否接收复制
}

// Raft-5
func (rf *Raft) Replicating(peer string) error {
	// 获取已发送日志的索引
	i := rf.sentLength[peer]
	// 获取当前日志的末尾索引
	ei := len(rf.Logs) - 1
	prevLogTerm := 0

	// 选择需要复制的日志条目
	var entries []LogEntry
	if ei >= i {
		entries = rf.Logs[i : ei+1]
	} else {
		entries = []LogEntry{}
	}

	// 如果不是第一条日志,则获取前一条日志的任期
	if i > 0 {
		prevLogTerm = rf.Logs[i-1].Term
	}

	// 与指定节点建立 RPC 连接
	client, err := rpc.DialHTTP("tcp", peer)
	if err != nil {
		log.Println("Dialing error: ", err)
		return nil
	}

	// 准备 RPC 请求的参数
	var reply LogReplyArgs
	args := &LogRequestArgs{
		LeaderId:     "localhost:" + rf.id,
		CommitLength: rf.CommitLength,
		Term:         rf.currentTerm,
		LogLength:    i,
		LogTerm:      prevLogTerm,
		Entries:      entries,
	}

	// 通过 RPC 发送日志条目给指定节点
	err = client.Call("Raft.Replying", args, &reply)
	if err != nil {
		log.Println("RPC error: ", err)
		return nil
	}
	return nil
}

6.3 Follower 回应日志复制

  在日志复制中,关键点在于如何找到日志复制的切入点,通过在 Follower 从后向前逐个检测中,找到最合适的日志切入点开始复制。如果找到了切入点,则同时开启 Follower 的日志复制,即 AppendEntries() 函数。

// Raft-6
func (rf *Raft) Replying(args *LogRequestArgs, reply *LogReplyArgs) error {
	// 锁定 Raft 实例以防止并发修改
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// 如果请求中的任期比当前节点的任期更高,更新任期并将当前角色设为 Follower
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = "null"
		rf.currentRole = Follower
		rf.currentLeader = args.LeaderId
	}

	// 如果任期相同且当前节点角色为 Candidate,更新角色为 Follower
	if args.Term == rf.currentTerm && rf.currentRole == Candidate {
		rf.currentRole = Follower
		rf.currentLeader = args.LeaderId
	}

	// 验证日志条目
	logOk := (len(rf.Logs) >= args.LogLength) && (args.LogLength == 0 || args.LogTerm == rf.Logs[args.LogLength-1].Term)

	// 准备响应
	reply.NodeId = "localhost:" + rf.id
	reply.CureentTerm = rf.currentTerm // 注意:此处 `CureentTerm` 可能有拼写错误,建议修正为 `CurrentTerm`
	reply.Ack = 0
	reply.Flag = false

	// 如果任期相同且日志条目有效,追加日志并设置确认号和标志
	if args.Term == rf.currentTerm && logOk {
		rf.AppendEntries(args.LogLength, args.CommitLength, &args.Entries)
		ack := args.LogLength + len(args.Entries)
		reply.Ack = ack
		reply.Flag = true
	}

// 远程调用 Leader 节点的 Raft-8 对回应进行处理
	client, err := rpc.DialHTTP("tcp", args.LeaderId)
	if err != nil {
		log.Println("Dialing error: ", err)
		return nil
	}
	flag := false // 并无实际作用,仅作为参数占位
	err = client.Call("Raft.ReceivingAck", &reply, &flag)
	if err != nil {
		log.Println("RPC error: ", err)
		return nil
	}
	return nil
}

6.4 Leader 更改日志复制命令

  在 Leader 收到 Follower 的回应后,要根据回应对日志复制的命令做出一定修改,逐步找到每个 Follower 日志复制的切入点。在这一步中,如果 Follower 没有开始日志复制,Leader 则将日志复制的切入点提前一位,然后再次调用 Replicating 尝试进行复制;反之,则将 Follower 节点的复制切入点记录,并开始提交日志进程。

// Raft 8
func (rf *Raft) ReceivingAck(reply *LogReplyArgs, flag *bool) error {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// 检查响应的任期是否与当前节点的任期匹配,并且节点角色是 Leader
	if reply.CureentTerm == rf.currentTerm && rf.currentRole == Leader {
		// 如果响应有效且确认号大于或等于当前确认长度,则更新发送和确认长度
		if reply.Flag && reply.Ack >= rf.ackedLength[reply.NodeId] {
			rf.sentLength[reply.NodeId] = reply.Ack
			rf.ackedLength[reply.NodeId] = reply.Ack
			fmt.Println("开始提交日志")
         // go rf.CommitEntries() // 提交日志的操作可以在这里被调用
		} else if rf.sentLength[reply.NodeId] > 0 {
			// 如果响应无效或确认号不足,减少发送长度并重新尝试复制日志
			rf.sentLength[reply.NodeId] = rf.sentLength[reply.NodeId] - 1
			go rf.Replicating(reply.NodeId)
		}
	} else if reply.CureentTerm > rf.currentTerm {
		// 如果响应中的任期大于当前任期,更新节点状态为 Follower
		rf.currentTerm = reply.CureentTerm
		rf.currentRole = Follower
		rf.votedFor = "null"
	}
	return nil
}

6.5 Follower 复制日志

  在 6.3 节中,如果节点接受了日志复制命令,则将会从切入点后,丢弃之前的无效日志,并复制当前 Leader 的日志条目,这一步也对应于 Raft-7 算法。

// Raft-7
func (rf *Raft) AppendEntries(logLength int, leaderCommit int, entries *[]LogEntry) {
	// 检查是否需要截断现有日志条目
	if len(*entries) > 0 && len(rf.Logs) > logLength {
		// 如果现有日志的条目与新条目的条目不匹配,则截断现有日志
		if rf.Logs[logLength].Term != (*entries)[0].Term {
			rf.Logs = rf.Logs[:logLength]
		}
	}

	// 将新的日志条目追加到现有日志中
	if logLength+len(*entries) > len(rf.Logs) {
		startIndex := logLength
		rf.Logs = append(rf.Logs, (*entries)[startIndex:]...)
	}

	// 更新提交日志的长度
	if leaderCommit > rf.CommitLength {
		// 打印正在提交的日志条目索引
		for i := rf.CommitLength; i < leaderCommit; i++ {
			fmt.Println("Follower Commit Log ", i)
		}
		// 更新提交长度
		rf.CommitLength = leaderCommit
	}
}

6.6 测试结果

  在这里我们已经完成了通常情况下的日志复制,但是对于新加入群组的节点我们还没有进行处理,因此新节点还无法获取到日志。为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制;反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是则对新节点进行日志复制。

  为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制。反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是,则对新节点进行日志复制。日志复制的过程则是直接调用 Replicating 函数。

  在这个过程中我们可以发现,之前 RegisterNode 中,检测到新节点存在就直接返回的操作是不可行的。举例来说,假设现在有 A、B、C 三个节点,其中 A 是 Leader,由 A 向其他两个节点发送心跳消息。此时 A 挂掉,B 和 C 监听到没有心跳消息之后开始重新选举,并且 B 成为了新的 Leader,此时 B 发现无法连接上 A 节点,于是将其从本地 peers 中删除,但是 C 节点并对 A 节点连接,因此 C 的 peers 中还保留了 A 的端口号。之后 A 节点选择 C 节点作为中心节点,C 节点发现 A 节点存在后,在 RegisterNode 函数直接返回,因此 B 节点无法得知 A 节点的信息,A 节点也无法加入群组。为了解决这个问题,在 RegisterNode 函数中,应该修改为本地 peers 中如果存在新节点的端口号,则本地不添加,但是仍然进行广播告知其他节点。

  此外,假设现有 A、B 节点正在进行选举阶段,并且在此之前两个节点中有日志消息存在。此时 C 节点尝试加入群组,并且成功了,但是由于选举过程中没有 Leader,导致 C 节点并未收到日志复制。所以为了解决此问题,需要在每个节点成功当选 Leader 之后,对 peers 中的每个节点进行日志复制,一方面可以更新日志,丢弃无效日志,另一方面可以避免选举过程中节点加入未进行日志复制的情况。

  完成上述内容后,系统已经基本完整,对于投票和日志复制功能基本完善,部分测试结果如下。

在这里插入图片描述

七、日志提交

  日志的提交,本项目通过打印参数来进行代替,仅供参考。

  在 6.4 节的 ReceivingAck 函数中,当 Leader 收到的回应是接受日志复制时,我们将开始进行日志的提交判定。当一个日志被超过半数 Follower 复制的时候,即可提交。

  值得一提的是,在我根据所学内容探究时,发现 Follower 是在 Leader 监听到新日志时,检测 Leader 的日志提交数量,但是 Leader 是在 Follower 接受日志复制后才进行提交,这意味着 Leader 的 CommitLength 会比 Follower 的大一个。但是当有新节点加入时,由于此时对新节点的日志复制不是在监听到新日志后,所以 Leader 的 CommitLength 不会发生改变,此时新节点和 Leader 的 CommitLength 就是相同的。

// Acks 计算并返回已经确认了指定日志长度 (lengths) 的节点数量。
// 如果一个节点的日志长度等于或超过指定的 lengths,或者它已经确认了指定的日志长度,则计入返回值。
func (rf *Raft) Acks(lengths int) int {
	rf.mu.Lock() // 加锁以确保并发安全
	defer rf.mu.Unlock() // 函数退出时自动解锁

	ret := 0 // 记录满足条件的节点数量
	for _, peer := range rf.peers { // 遍历所有节点
		// 如果是当前节点且日志长度大于或等于指定长度,或其他节点已经确认了该长度
		if (peer == "localhost:"+rf.id && len(rf.Logs) >= lengths) || rf.ackedLength[peer] >= lengths {
			ret += 1 // 满足条件的节点数量加1
		}
	}
	return ret // 返回满足条件的节点数量
}

// Raft-9
func (rf *Raft) CommitEntries() {
	minAcks := len(rf.peers)/2 + 1 // 多数节点数量,即需要多少节点确认日志才能提交
	ready := []int{} // 用于记录可以提交的日志索引

	// 检查从第1条到当前最后一条日志,判断哪些日志已经被大多数节点确认
	for i := 1; i <= len(rf.Logs); i++ {
		if rf.Acks(i) >= minAcks { // 如果确认的节点数量大于或等于多数节点
			ready = append(ready, i) // 将该日志索引添加到 ready 列表中
		}
	}

	// 找到最大可提交的日志索引
	maxReady := max(ready)

	// 如果有可提交的日志,并且它的索引大于当前的提交索引,并且该日志是在当前任期产生的
	if len(ready) > 0 && maxReady > rf.CommitLength && rf.Logs[maxReady-1].Term == rf.currentTerm {
		for i := rf.CommitLength; i < maxReady; i++ { // 提交所有可提交的日志项
			fmt.Println("Leader Commit Log ", i) // 打印日志提交的索引
		}
		rf.CommitLength = maxReady // 更新提交的日志索引
		fmt.Println(rf.id, rf.CommitLength) // 打印当前节点 ID 和新的提交索引
	}
}

// max 返回一个整数数组中的最大值。
// 如果数组为空,则返回 0。
func max(arr []int) int {
	if len(arr) == 0 {
		return 0 // 如果数组为空,返回 0
	}
	maxVal := arr[0] // 假定第一个元素为最大值
	for _, val := range arr { // 遍历数组
		if val > maxVal { // 如果发现更大的值
			maxVal = val // 更新最大值
		}
	}
	return maxVal // 返回数组中的最大值
}

部分测试结果如下。

在这里插入图片描述

八、小结

  ​在本次实现 Go-Raft 协议过程中,本人其实收获了很多东西。

  ​首先是不能盲目在网上找资料,资料都是参差不齐的,特别是学习到现在一些协议的实现,能真正有效借鉴的文章已经比较难找到了。个人在前期从 GitHub 上到处翻找有小半个月有余,但是最后自己从头实现起来也就一周不到的时间。个人认为,如果已经知道了想完成的代码的原理,不妨一点点尝试钻研,遇到不会的去搜索如何实现某个功能,这样反而能更快的完成项目。

  此外,对于项目的复盘和测试是非常重要的,在写这篇博客前,本人已经完成了一遍实现,当时粗略测试下来没有什么问题,但是实际写博客过程中,为了截图我重头实现了一遍,这下就发现了很多问题。特别是对于一些类似于选举期间无 Leader 的情况,或是 Leader 短时间挂掉再恢复的情况,或多或少都出现了一些 Bug,甚至有一些功能在第一、二、三、四部分都完成得不错,但是到了第五部分才忽然发现问题。但是为了避免整体逻辑的问题,并未做出修改,不过在 GitHub 和 Gitee 上对每个大节的测试代码均提交到了不同的文件夹,可以自行按照需求提取。

​   如果在拉取代码后发现了部分运行错误,也许可以尝试一下用下个部分的代码,也欢迎各位提问,如若看到必定尽快回答。


http://www.kler.cn/a/292389.html

相关文章:

  • Elastic Agent:可灵活地在任何地方发送和处理任何数据
  • vue 项目使用 nginx 部署
  • 【C++】深入理解自定义 list 容器中的 list_iterator:迭代器实现详解
  • 算力100问☞第5问:算力如何衡量?
  • 数据库范式、MySQL 架构、算法与树的深入解析
  • 基于STM32设计的森林火灾监测系统(华为云IOT)_263
  • ElasticSearch-Ingest Pipeline Painless Script
  • 前端代码注释风格 - CSS篇
  • 【Kubernetes知识点问答题】Pod
  • 2024跨境电商卖家寻增量,1688寻源通接口 也想做“主角”
  • 树莓派3B驱动ST7735(内核)(TODO)
  • C语言——插入排序
  • 文本匹配任务(下)
  • 红队攻防 | 利用GitLab nday实现帐户接管
  • 【2024数模国赛题目解析丨免费分享】
  • CompleteableFuture异步编程框架
  • [linux基础知识]创建新用户并使用该用户
  • 【2024数学建模国赛赛题解析已出】原创免费分享
  • 神经网络中激活函数介绍、优缺点分析
  • 何为图像处理,有哪些处理方法
  • AGV行业遇冷,叉车AGV逆风崛起:180家企业掀起血战
  • linux中vim常用命令大全详细讲解
  • 几乎每一位面试官都会关注的能力,你做到了吗?
  • Linux 磁盘管理-终于把fdisk命令创建分区挂讲明白了
  • 优化芋道后台菜单管理卡顿问题element-plus版本
  • WPS Office for Linux 12 个人版上线deepin 23商店:UI 视觉重构,新增多项 AI 功能