当前位置: 首页 > article >正文

6.5840 Lab 3: Raft

论文很重要 raft-zh_cn/raft-zh_cn.md at master · maemual/raft-zh_cn · GitHub

Part 3A: leader election (moderate)

十次test都过了

实现 Raft 的领导者选举和心跳机制(AppendEntries RPC,无日志条目)。第 3A 部分的目标是实现以下功能:

  1. 集群中能够成功选举出一个领导者。
  2. 如果没有节点故障,当前领导者能够保持其领导地位。
  3. 如果当前领导者发生故障,或者其与其他节点之间的通信中断,集群能够选举出新的领导者接替其位置。

首先定义raft结构体字段并初始化,论文中全部给出了

type LogEntry struct {
	Term         int         // 用于区分不同的Leader任期
	CommandValid bool        // 当前指令是否有效。如果无效,follower 可以拒绝复制
	Command      interface{} // 表示可以存储任意类型的指令。
}

type Role int

const (
	Leader Role = iota
	Follower
	Candidate
)

// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (3A, 3B, 3C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	log []LogEntry

	currentTerm     int
	votedFor        int
	role            Role
	electionStart   time.Time
	electionTimeout time.Duration

	commitIndex int //已知已提交的最高的日志条目的索引(初始值为0,单调递增)
	lastApplied int //已知已应用到状态机的日志条目的索引(初始值为0,单调递增)

	nextIndex  []int //对于每一台服务器,发送到该服务器的下一个日志条目的索引(初始值为领导人最后的日志条目的索引+1)
	matchIndex []int //对于每一台服务器,已知的已经复制到该服务器的最高日志条目的索引(初始值为0,单调递增)
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	rf.role = Follower
	rf.electionStart = time.Now()
	rf.votedFor = -1
	rf.currentTerm = 0
	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.log = make([]LogEntry, 1)
	rand.Seed(time.Now().UnixNano())
	rf.electionTimeout = time.Duration(450+rand.Intn(150)) * time.Millisecond
	rf.nextIndex = make([]int, len(rf.peers))
	rf.matchIndex = make([]int, len(rf.peers))
	// Your initialization code here (3A, 3B, 3C).

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start ticker goroutine to start elections
	go rf.ticker()

	return rf
}

在ticker()中,题目提到"不要使用Go的 time.Timer 或 time.Ticker ,它们很难正确使用",所以用原本代码框架中的time.Sleep()来实现定时操作,sleep的时间也是leader心跳的间隔时间,对于节点选举超时的定时器,用time.Since(rf.electionStart) >= rf.electionTimeout实现。

对不同的role的节点,执行不同操作,leader是心跳,其他则是开始选举,这里使用一把大锁保平安。

func (rf *Raft) ticker() {
	for rf.killed() == false {

		// Your code here (3A)
		// Check if a leader election should be started.
		rf.mu.Lock()
		// 如果是 Follower 或 Candidate,检查是否超时
		if rf.role == Follower {
			if time.Since(rf.electionStart) >= rf.electionTimeout {
				// 超时,开始选举
				rf.BecomeCandidate()
				rf.startElection()
			}
		}
		if rf.role == Candidate {
			if time.Since(rf.electionStart) >= rf.electionTimeout {
				rf.electionStart = time.Now()
				rf.startElection()
			}
		}
		// 如果是 Leader,定期发送心跳
		if rf.role == Leader {
			rf.sendHeartbeat()
		}

		rf.mu.Unlock()

		// pause for a random amount of time between 50 and 350
		// milliseconds.
		ms := 40 + (rand.Int63() % 100)
		time.Sleep(time.Duration(ms) * time.Millisecond)
	}
}

实现追加条目(AppendEntries)RPC以及请求投票(RequestVote)RPC,参数和返回值的字段以及方法的逻辑在论文中均有记录,这里要注意的是什么时候要进行选举超时定时器的重置rf.electionStart = time.Now(),对每个节点发送心跳和请求投票都需要用groutine,不同发送操作是异步的。若成为Leader,则之前必须是候选人。

在节点处理投票请求时注意对方和自己都是候选人的情况

请求投票(RequestVote)RPC

type RequestVoteArgs struct {
	Term         int // 候选人的任期号
	CandidateID  int // 请求选票的候选人的 ID
	LastLogIndex int // 候选人的最后日志条目的索引值
	LastLogTerm  int // 候选人的最后日志条目的任期值
}

type RequestVoteReply struct {
	Term        int  // 当前任期号,以便于候选人去更新自己的任期号
	VoteGranted bool // 候选人赢得了此张选票时为真
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (3A, 3B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		reply.VoteGranted = false
		return
	}

	if args.Term > rf.currentTerm {
		rf.BecomeFollower(args.Term, -1)
	}

	if rf.votedFor == -1 || rf.votedFor == args.CandidateID {
		DPrintf("candidate %d get vote from %d", args.CandidateID, rf.me)
		rf.votedFor = args.CandidateID
		rf.electionStart = time.Now()
		reply.VoteGranted = true
	} else {
		//处理两个节点同为候选人的情况
		reply.VoteGranted = false
	}
}

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

 追加条目(AppendEntries)

type AppendEntriesArgs struct {
	Term         int        // 领导人的任期号
	LeaderID     int        // 领导人的 ID
	PrevLogIndex int        // 紧邻新日志条目之前的那个日志条目的索引值
	PrevLogTerm  int        // 紧邻新日志条目之前的那个日志条目的任期值
	Entries      []LogEntry // 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率)
	LeaderCommit int        // 领导人已经提交的日志的索引值
}

type AppendEntriesReply struct {
	Term    int  // 当前的任期号,用于领导人更新自己
	Success bool // 如果 Follower 包含了匹配上 `PrevLogIndex` 和 `PrevLogTerm` 的日志条目时为 true
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}
	rf.votedFor = args.LeaderID
	rf.electionStart = time.Now()
	reply.Success = true
	return

}



func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	return rf.peers[server].Call("Raft.AppendEntries", args, reply)
}

开始发送心跳和开始选举

func (rf *Raft) BecomeFollower(term, votedFor int) {
	rf.role = Follower
	rf.currentTerm = term
	rf.votedFor = votedFor
	rf.electionStart = time.Now()
	DPrintf("Pod %d become follower by %d when term %d ", rf.me, votedFor, rf.currentTerm)
}

func (rf *Raft) BecomeCandidate() {
	rf.role = Candidate
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.electionStart = time.Now()
	DPrintf("Pod %d become candidate when term %d ", rf.me, rf.currentTerm)
}

func (rf *Raft) BecomeLeader() {
	rf.role = Leader
	for i := range rf.peers {
		rf.nextIndex[i] = len(rf.log) // 领导者的 nextIndex 为日志最后一条之后
		rf.matchIndex[i] = 0          // 初始 matchIndex 为 0
	}
	DPrintf("Pod %d become leader when term %d ", rf.me, rf.currentTerm)
}

func (rf *Raft) sendHeartbeat() {
	term := rf.currentTerm
	leaderCommit := rf.commitIndex

	// 遍历所有 Follower,发送 AppendEntries RPC

	for i := range rf.peers {
		if i != rf.me {
			go func(server int) {
				prevLogIndex := rf.nextIndex[server] - 1
				prevLogTerm := 0
				if prevLogIndex >= 0 {
					prevLogTerm = rf.log[prevLogIndex].Term
				}

				args := &AppendEntriesArgs{
					Term:         term,
					LeaderID:     rf.me,
					PrevLogIndex: prevLogIndex,
					PrevLogTerm:  prevLogTerm,
					Entries:      nil, // 心跳中无日志条目
					LeaderCommit: leaderCommit,
				}

				reply := &AppendEntriesReply{}
				if rf.sendAppendEntries(server, args, reply) {
					if !reply.Success && reply.Term > rf.currentTerm {
						rf.BecomeFollower(reply.Term, -1)
					}
				}
			}(i)

		}
	}
	rf.electionStart = time.Now()
}

func (rf *Raft) startElection() {
	// 统计自己的票数
	votes := 1
	term := rf.currentTerm

	// 获取当前日志信息
	lastLogIndex := len(rf.log) - 1
	lastLogTerm := 0
	if lastLogIndex >= 0 {
		lastLogTerm = rf.log[lastLogIndex].Term
	}
	// 遍历所有其他节点,发送 RequestVote RPC
	for i := range rf.peers {
		if i != rf.me {
			go func(server int) {
				args := &RequestVoteArgs{
					Term:         term,
					CandidateID:  rf.me,
					LastLogIndex: lastLogIndex,
					LastLogTerm:  lastLogTerm,
				}
				reply := &RequestVoteReply{}

				if rf.sendRequestVote(server, args, reply) {

					if reply.Term > rf.currentTerm || !reply.VoteGranted {
						rf.BecomeFollower(reply.Term, -1)
						return
					}

					if reply.VoteGranted {
						votes++
						if votes > len(rf.peers)/2 && rf.role == Candidate {
							rf.BecomeLeader()
							rf.sendHeartbeat()
						}
					}
				}
			}(i)
		}
	}
}

Part 3B: log (hard)


抱歉!

由于本人实习的原因,这个轮子项目搁浅了,估计不会再更新


http://www.kler.cn/a/595832.html

相关文章:

  • react 常用插件
  • 暂存合并分支
  • 【机器学习-模型评估】
  • 老旧中控系统智能化改造方案:基于巨控OPC561Q-C模块实现多通道实时报警
  • 【css酷炫效果】纯CSS实现全屏万花筒效果
  • 八股文MYSQL
  • Centos7部署学之思考试系统
  • 新书速览|云原生Kubernetes自动化运维实践
  • 解决 uniapp 开发中权限申请同步告知目的问题| 华为应用商店上架审核问题解决
  • 初始EBP和ESP的设置
  • Android Compose 图像修饰深度解析(八)
  • 使用Python轻松拆分PDF,每页独立成文件
  • (一)丶Windows安装RabbitMQ可能会遇到的问题
  • JavaScript性能优化实战:深入探讨性能瓶颈与优化技巧
  • STM32 SPI总线驱动CH376T实现U盘/TF卡读写全解析—CH376数据手册分析(中上) | 零基础入门STM32第七十三步
  • Event driven agentic document workflows 笔记 - 3
  • 【Javascrip】Javascript练习01 REST API using Express.js.
  • NFS 安装与测试
  • MySQL数据库入门到大蛇尚硅谷宋红康老师笔记 高级篇 part11
  • C++修炼:内存管理