在part A和part B当中,基本的将raft的东西都完成了。part C主要做的就是持久化的内容,也就是要将raft的一些状态信息保存下来,另外part C的测试脚本会模拟极端混乱的网络情况,并且会运行1000次,所以要对前面的代码做一个优化,不然无法通过part C。总的来说,part C要增加的代码不多,主要是持久化raft状态,以及一些优化的地方。
比较懒,这里有些优化的代码在part D中,主要是用条件变量来实现数据的apply。不过我的part D有些bug,做起来十分痛苦,所以暂时就没做完,后面用了别人的raft,去做kv store了。
持久化
raft论文中,给出了哪些东西要被持久化:分别是currentTerm
,votedFor
,log[]
。 调用persist()
方法来进行持久化,所以在persist()内部的实现也自然十分直观,使用实验中提供的labgob
将数据序列化然后保存起来就行,比如说写入到文件中。
func (rf *Raft) persist() {
rf.persister.SaveRaftState(rf.encodeState())
}
// 根据论文,只需要将currentTerm,votedFor,log[] 保存下来即可
func (rf *Raft) encodeState() []byte {
buf := new(bytes.Buffer)
encoder := labgob.NewEncoder(buf)
_ =encoder.Encode(rf.currentTerm)
_ = encoder.Encode(rf.votedFor)
_ = encoder.Encode(rf.log)
return buf.Bytes()
}
读取持久化的数据
当一个节点挂了以后,重启之后要读取持久化的数据,来恢复到之前的状态。这里就是使用labgob
来将持久化的数据反序列化出来。
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
buf := bytes.NewBuffer(data)
decoder := labgob.NewDecoder(buf)
var currentTerm int
var votedFor int
var log []Entry
if decoder.Decode(¤tTerm) != nil ||
decoder.Decode(&votedFor) != nil ||
decoder.Decode(&log) != nil {
DPrintf("{Node %v} restores persisted state failed", rf.me)
}
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.log = log
rf.commitIndex = rf.log[0].Index
}
重新恢复的节点的commitIndex的设置是一个问题,我直接就设置为0。这不会有什么问题,因为将来的日志追加以后,通过日志的一致性检查还是可以正常的追加日志,然后在leader的心跳中附带的leaderCommit进行同步。
何时去持久化数据
记住一句话就行,哪里对log
,votedFor
,currtentTerm
进行了修改,那么哪里就需要持久化数据。所以,Start()
之后,选举失败的时候,接收到选举RPC的时候都需要持久化数据。具体的话就不贴代码了,可以看我的代码实现。
优化解决日志冲突
在我的part B实现中,对于日志冲突,我只是递减nextIndex。虽然可以过测试点,不过这显然是不够高效的。接下来按照Raft论文中的建议,来对这一部分进行优化。
If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.
意思是说,如果发现当前节点的rf.[prevLogIndex]
和prevLogTerm
不匹配。那么就找到和这个prevLogTerm
冲突的Term的第一个日志,然后将这个Index返回Leader。然后Leader递减这个Index,那么就一次性越过了所有冲突的日志。
看下图来帮助理解:
往leader 的Index = 5追加数据,就发现了冲突。所以follower找到第一个term == 2的log entry的index,也就是这里的 3。然后返回给leader。在leader中重新设置nextIndex,rf.nextIndex[target] = ConflictIndex
。在下一次发送日志的时候,prevLogIndex = rf.nextIndex[target] - 1
,就恰好越过了所有冲突的term = 2 的日志。
不过,这里还需要考虑,如果发生冲突是因为follower在prevLogIndex处根本没有日志。那么ConflictIndex 就是最后一个日志的index + 1,也就是len(rf.log) = 1。
以极端情况来说,如果一个节点没有任何日志,那么它只有一个index = 0的空日志(raft的日志是从index = 1开始的)。那么返回的返回的ConflictIndex = 1才行,那么在Leaer中在修改rf.nextIndex[target] = 1
,然后prevLogIndex = rf.nextIndex[target] - 1 = 0
,那么就通顺了。
代码如下:
if args.PrevLogIndex > len(rf.log) - 1 { // 如果follower日志太短
reply.ConflictIndex = len(rf.log) //曾经在这里写成了len(rf.log) - 1错了很久
} else {
// 找到冲突Term的第一个log entry的index
for i := args.PrevLogIndex; i > 0; i-- {
if rf.log[i].Term != args.PrevLogTerm {
reply.ConflictIndex = i + 1
}
}
}
// 在leader中,如下处理
// handleAppendRPC中
rf.nextIndex[target] = reply.ConflictIndex
优化死循环
在之前的PartB中,是以死循环的方式来发送报文的。问题就是会导致RPC调用次数非常非常多,接下来使用条件变量来稍微优化一下,每一个raft节点都为它的Peers开一个线程,专门用于日志的同步。
有一个问题就是,条件变量阻塞到什么时候才可以发送日志。只有leader的最后一个log entry的Index大于其他节点的matchIndex的时候,就是表示要同步数据了。
如果目前不需要同步日志,那么就会调用wait()
来阻塞该线程。Raft论文中提到持续地调用AppendEntries RPC,直到日志正确同步为止。所以,我们循环地去判断for !rf.needReplicating(target
是否需要同步日志。另外,只有leader才可以发送日志,所以在我们还要让rf.state == Leader
条件成立。
func (rf *Raft) replicator(target int) {
rf.replicateCond[target].L.Lock()
defer rf.replicateCond[target].L.Unlock()
for rf.killed() == false {
for !rf.needReplicating(target) {
rf.replicateCond[target].Wait()
}
rf.broadcastLogEntry(target)
time.Sleep(time.Millisecond * 50)
}
}
// 判断是否要同步数据
func (rf *Raft) needReplicating(peer int) bool {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.state == Leader && rf.matchIndex[peer] < rf.getLastEntry().Index
}
在broadcastHeartbeats
中,如果需要同步日志,那么我们调用条件变量的Signal()
来让线程进行日志同步。
优化Apply
同样的,我们可以为任何一个raft节点都开一个线程,同样地也是用条件变量来优化apply的过程。在我现在的2C代码中我没做,2D中就这样做了。lastApplied + 1第一个已经commit但是未apply的log entry的index,commitIndex + 1是第一个未commit的log entry的log index。注意slice的区间是左闭右开的。
func (rf *Raft) applier() {
for rf.killed() == false {
rf.mu.Lock()
// 如果lastApplied >= ,说明暂时没有log entry需要apply
for rf.lastApplied >= rf.commitIndex {
rf.applyCond.Wait()
}
// 获得目前已经commit但是还没有apply的日志
entries := make([]Entry, commitIndex-lastApplied)
var start = lastApplied + 1
var end = commitIndex + 1
copy(entries, rf.log[start : end])
for _,entry := range entries {
applyMsg := ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: entry.Index,
CommandTerm: entry.Term,
}
rf.applyCh <- applyMsg
}
rf.lastApplied = rf.commmitIndex
rf.mu.Unlock()
}
}
在follower和leader中,如果发现此时有log entry可以apply了,就使用signal来唤醒这个线程,比如以下的代码:
func (rf *Raft) incFollowerCommitIndex(leaderCommit int) {
newCommitIndex := Min(leaderCommit, rf.getLastEntry().Index)
if newCommitIndex > rf.commitIndex {
rf.commitIndex = newCommitIndex
rf.applyCond.Signal() // 唤醒applier来apply日志
}
}
Summary
2c的话应该要做的东西不是特别多,优化为条件变量,然后一定要实现冲突处理的加速过程,不然过不了测试点。其他的话对于代码的修改不是特别多。