本文主要解释6.824中 lab2 Raft的part A和Part B部分。源代码链接:2A的代码,2B的代码
在这里,我不想讲述论文中的细节问题,只关注于如何实现代码,以及在编码的过程中可能需要考虑到的一些情况。
Leader Selection
part A就是解决最简单的选举问题。所以对于论文中的figure 2中很多变量都是暂时不需要的。下面是2A需要的一些成员变量。
type Raft struct {
... 其他自带的略
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
state NodeState
//2A
currentTerm int //当前节点的term
votedFor int // 给谁投票
electionTimer *time.Timer // 选举定时器
heartbeatTimer *time.Timer //心跳定时器,只有leader才能发送心跳
}
此外,2A不涉及到日志的操作,所以在RPC请求的参数签名也比较简单,这里就暂时不贴了,直接我的github源码就行。
节点之间的转换关系图如下:
初始状态下,所有的节点都是follower状态。然后等待各自的选举定时器超时,发起选举,状态切换到candidate的状态。接着重置选举定时器,如果在这个定时器超时之前都没能成为leader,就发起新一轮的选举。
我们将黑体字用代码来表述:
rf.changeState(Candiate)
rf.currentTerm++
rf.startElection()
rf.electionTimer.Reset(electionTimeout()) //注意成为candidate也要重置选举定时器
成为candiate之后,如果收到了大部分节点的投票,这里的大部分表示的意思是:集群中的一半以上的节点,如5个节点,那么只要收到3个节点的投票就可以成为leader。
在选举过程中,无论是candiate状态,还是后来成为了leader,只要发现比自己的term大,那么就成为follower。这里可能发生的情况是,集群中一个节点掉线了。也因此一直无法收到来自leader的心跳,它可能反反复复的发起选举,不过都没人应答,从而导致它的term很大。当重新连接到网络的时候,它的term已经明显比其他节点都大了,包括leader,所以此时的leader就会成为follower。
开始选举
选举其实挺简单的,没什么好说的。需要提及的一点就是,发送选举RPC也是并发进行的,所以对于节点的状态修改也要加锁。另外一个要考虑的就是,如何处理延迟到达的rpc应答。
TA这里提到,对于延迟到达的RPC应答,我们应该直接丢弃。这里考虑的情况是: 假设一个节点发起选举请求后,可能没有收到足够的选票,然后又开始下一轮的选举,那么收到第一轮选举的RPC应答应该直接丢弃。
另外,如果当前节点已经成为了其他节点的follower,那么对于选举RPC应答我们应该也直接不采取任何操作。
所以,在接收选举RPC应答中,我们要判断一下,只有当前节点状态是candidate,且自身的term和发出去的选举RPC的term相等。
if rf.currentTerm == request.Term && rf.state == Candidate {
if response.VoteGranted {
grantedVotes += 1
if grantedVotes >= majority {
rf.ChangeState(Leader)
rf.BroadcastHeartbeat() // 成为leader,那么就发送心跳
}
} else if response.Term > rf.currentTerm {
rf.ChangeState(Follower)
rf.currentTerm, rf.votedFor = response.Term, -1
}
}
另外,如果发现term比自己大,那么说明当前节点不再是一个合适的candidate,那么就要将voteFor
设置为-1,为将来响应其他节点的选举RPC做准备。
处理选举RPC
当选举定时器超时后,节点开始发起选举。这里用的RequestVote
来向其他节点发送选举RPC。在2A中代码还是比较简单的,如下:
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm ||
(args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) {
reply.Term, reply.VoteGranted = rf.currentTerm, false
return
}
if args.Term > rf.currentTerm {
rf.ChangeState(Follower)
rf.currentTerm = args.Term
rf.votedFor = args.CandidateId
rf.electionTimer.Reset(electionTimeout())
reply.Term, reply.VoteGranted = rf.currentTerm, true
}
}
一个节点可能同时接收到多个节点的选举rpc,因此对于它本身的一些状态的改变要用锁来保证互斥。在这里主要考虑的失败的情形就是两种:1. candidate的term太小 2. 已经给其他人投过票。
args.Term < rf.currentTerm
就是为了处理candidate term太小的情况,很好理解。
后面一个条件比较长。这里主要考虑如下这种情况:
两个节点同时发起选举,初始状态下他们的term都是0。发起选举后term 都变成了1,假设一个节点已经给其中一个节点投过票,那么自然是不能再给另外一个节点投的。rf.votedFor != args.CandidateId
就是说明了收到的选举rpc和它voteFor的不是一个节点。
剩下的代码就很好懂了,只有candidate的term比自己的term大才可以投票,重置选举定时器即可。
发送心跳
发送心跳比较简单,只要处理一种情况即可(就目前而言),如果发现一个节点要大,那么就说明不再适合当leader,转为follower。核心代码如下:
rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.ChangeState(Follower)
}
rf.mu.Unlock()
虽然说,心跳RPC的应答也会存在延迟的情况,因此也要判断下是否为延迟RPC。不过在2A中,看起来不这样做也没关系,在后续的部分中,我还是这样做了。
处理心跳RPC
当一个节点成为leader之后,马上要给其他节点发送心跳。这里要考虑的情形十分简单,只要进行term的简单判断。如果心跳收到的term太小了,说明此时的leader不适合。
这里有一种情况要考虑的就是,如果leader掉了,其他节点重新选举后,老的leader重新加入到网络中以后,它的term会很小。它此时的状态还是leader,所以它应该将自己的状态转为follower,然后将term追上此时的节点。这种情况就是测试用例ReElection
要考虑的情况。最后,如果成功接收到了心跳,就应该重置选举定时器。
如果收到的心跳是和当前节点相同term的且当前节点是candidate状态,那么也直接转为follower状态。所以下面的代码中,我们只考虑了args.Term < rf.currentTerm
和args.Term > rf.currentTerm
,相等的心跳都是合法的。
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 如果发现来自leader的rpc中的term比当前peer要小,那么说明它不适合当leader
if args.Term < rf.currentTerm {
reply.Term, reply.Success = rf.currentTerm, false
return
}
// 一般来讲,在vote的时候已经将currentTerm和leader同步
// 不过,有些peer暂时的掉线或者其他一些情况重连以后,会发现term和leader
// 不一样,所以收到大于自己的term的rpc也是第一时间同步.而且要将votefor重新设置为-1
// 等待将来选举
if args.Term > rf.currentTerm {
rf.currentTerm, rf.votedFor = args.Term, -1
}
rf.ChangeState(Follower)
rf.electionTimer.Reset(electionTimeout())
reply.Term, reply.Success = rf.currentTerm, true
}
2A中注意的地方
随机数的种子
我目前给的代码没有改,后续的lab中已经改了。就是
rand.Intn()
这里获得到的随机数都是固定的,你运行多少次都一样。所以每次在重置超时定时器的时候,需要重置种子。只有leader才可以发送心跳
对于普通节点,他们的心跳定时器应该都是停止的。并发下的情况比较复杂,所以最简单的办法就是在发送心跳的时候判断下,当前节点的状态是不是leader。
调试的技巧
对于AppendEntryArgs,AppendEntryReply这些参数,可以重写他们的
String()
,在调试的时候比较方便。在需要的地方多大Dprintf,就可以观察日志了(记得把util.go中的Debug=true)打开。