MIT 6.824 Lab2 Raft (2)

Log Replication

基本的leader选举还是比较简单的,对于日志备份相比来说就是困难很多了。涉及到一些选举的优化,日志丢失的处理,以及一些现实中会出现的情况,比如说很多log entry丢失,导致log entry不能commit,leader掉线的日志的处理等内容。

我的2B代码中只是提供了最简单的实现,还有一些优化的空间,在2B中没有做,这是为了简单以及表述能够更加直观,后续的实验中优化了一些。

先说一下我踩的最开始的一个坑,是因为我自己没有仔细看代码的注释。对于log entry commit的时候,要将这个log entry写入到applyCh中

在2B中,我们首先要实现的一个函数就是Start(),上层应用程序调用Start()来同集群中同步日志,也就是广播log entry。

首先先来看下Start()要做的事情,根据Start()中的注释,可以知道一些Start的细节。

func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    index := -1
    term := -1
    isLeader := true

    // Your code here (2B).
    if rf.state != Leader {
        return -1,-1,false
    }
    entry := rf.appendLogEntry(command)
    rf.broadcastHeartbeats(false)
    index = entry.Index
    term = entry.Term
    isLeader = rf.state == Leader
    return index, term, isLeader
}

在上面代码中,最开始需要加锁是考虑到:存在并发的调用Start()的情况,那么对于leader状态的更新是不能被打断的。

在Start()的逻辑中,上层应用传过来的command要封装为log entry,追加到leader的日志中。 调用rf.appendLogEntry(command)中完成了这一工作,其中还涉及到对于Leader的一些状态信息的改变,比如说nextIndex,matchIndex。接着调用broadcastHeartbeats来向所有节点同步新追加的log entry。

在Raft中,通过AppendEntries RPC来和follower之间交互,无论是心跳还是同步日志,这里我原封不动的按照论文的实现,所以broadcastHeartbeats有两个状态,来表示是同步日志还是发送心跳。

The leader keeps track of the highest index it knows to be committed, and it includes that index in future AppendEntries RPCs (including heartbeats) so that the other servers eventually find out. --------- Raft page 7

同步日志

在日志同步的过程中,我们使用为每一个follower都开辟一个go routine来发送日志,所以多线程编程中又涉及到了加锁的情况。日志同步,最主要的是要考虑到一种情况:当前的leader随时会发生状态的变化,比如说其他发现一个新的节点term比它大,那么就转为follower状态,因此要停止所有日志同步的线程,因为Raft的论文提到,AppendEntries RPC会一直发送,直到RPC被正确接收

If followers crash or run slowly, or if network packets are lost, the leader retries Append-Entries RPCs indefinitely (even after it has responded tothe client) until all followers eventually store all log en-tries. -------- Raft page 6

为了避免这个问题,我们在持续地发送报文到时候,都要判断当前节点的状态是不是Leaer

大致的代码如下,因为当前节点的状态在多线程编程中极有可能随时发生改变,因此需要加锁:

for !done { //死循环发送报文,直到发送成功
    rf.mu.Lock()
    if rf.state != Leader {
        // 多个线程并发的发送报文,可能会对leader的state产生修改
        // 所以在发送报文之前先检查状态是不是leader
        rf.mu.Unlock()
        break
    }
    rf.mu.Unlock()
    var reply = new (AppendEntryReply)
    args := rf.newAppendEntriesArgs(target)
    ok := rf.sendAppendEntries(target,args,reply)
    if ok {
        // handleAppendRPC中判断本次RPC发送成功了没有
        done = rf.handleAppendRPC(target,args,reply)
    }
    // 为了避免rpc发送的太多,睡一会再发
    time.Sleep(time.Millisecond * 50)
}

Log entry的封装

在日志的封装中,注意并不是每次只发送一个log entry,这里要考虑到的情况是,如果leader和follower之间的日志差的很多,那么要一次发送多个报文,封装为一个log entry数组。

func (rf *Raft) newAppendEntriesArgs(target int) *AppendEntryArgs {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    var args = new(AppendEntryArgs)
    args.Term = rf.currentTerm
    args.LeaderId = rf.me
    args.LeaderCommit = rf.commitIndex
    args.PrevLogIndex = rf.nextIndex[target] - 1
    args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
    args.Entries = rf.log[rf.nextIndex[target]:]
    return args
}

在这里,我想再对AppendEntries RPC的参数多一些描述。LeaderCommit 是leader当前已经commit的log entry的下标,将来通过心跳或者是在日志追加的时候发给folower,follower也因此来递增自己的commitIndex。leader中的nextIndex存放的是每一个follower的下一个可以存放log entry的下标PrevLogIndex 是follower最后一个log entry的Index,PrevLogTerm follower最后一个log entry的Term。我们将来通过这两个参数来实现Raft的日志一致性校验(日志一致性的内容在Raft论文的第七页提到),后面我还会重新说到这个问题。

记住,nextIndex存放的是follower的下一个log entry开始追加的index,所以,我们从nextIndex[target:]开始复制log entry。

Follower接收日志

Raft将AppendEntries当作是心跳,也当做是日志同步。所以在AppendEntries的处理函数中,我们要判断此时的AppendEntriesArgs是否包含着日志,此外,还要进行Raft中的日志一致性校验等工作。

在日志校验中,follower要做的是:

When sending an AppendEntries RPC, the leader includes the index and term of the entry in its log that immediately precedes the new entries. If the follower does not find an entry in its log with the same index and term, then it refuses the new entries

这里也就是说,我们要根据AppendEntriesArgs中的prevLogIndexprevLogTerm来判断日志是否一致,如果不一致,那么直接拒绝这个RPC。对于Leader在日志一致性校验中的工作,待会再说。如果万事俱备,那么就可以开始追加日志,大致代码如下:

if !rf.matchLog(args.PrevLogIndex,args.PrevLogTerm) {
    // 日志不匹配,那么直接不响应RPC
    reply.Term = rf.currentTerm
    reply.Success = false
    return
}
// 我们接收term一样的leader的日志,过期的直接不要
if args.Term == rf.currentTerm && len(args.Entries) > 0{
    DPrintf("{Node %v voteFor:%d} receive logs from {Node %v}",rf.me,rf.votedFor,args.LeaderId)
    rf.mergeLog(args.PrevLogIndex + 1,args.Entries)
}

日志匹配当中要考虑的就是,如果follower此时的日志很少,远小于prevLogIndex(这会出现在leader刚选上的时候,那会所有的nextIndex都是leader它自己的最后一个log entry的index)。所以要判断prevLogIndex是否超过了follower自己的日志长度。

func (rf *Raft) matchLog(index int, term int) bool {
    if len(rf.log) - 1 < index {
        return false
    }
    return rf.log[index].Term == term
}

if len(rf.log) - 1 < index的说明,如下图中,follower的len(rf.log) - 1 = 2index = 4,显然日志不匹配,返回false。

接下来要做的就是日志的追加了,记住prevLogIndex是Leader和follower相同的日志的最后一个log entry的index,所以我们要从prevLogIndex + 1开始追加日志。

这里要考虑到,follower存在一些冲突日志,论文中提到,任何冲突的日志应该删掉。确实应该这样做,我一开始直接从prevLogIndex +1开始追加,没有删除所有冲突的log entry。遇到了bug。正确的代码应该是:

func (rf *Raft) mergeLog(nextToAppend int,entries []Entry) {
     if len(rf.log) -1  >= nextToAppend { //判断是否存在冲突日志
         var logArr = make([]Entry,nextToAppend) 
         copy(logArr,rf.log[:nextToAppend]) //复制和leader一致的日志
         rf.log = logArr
     }
    rf.log = append(rf.log,entries...)
}

Leader处理RPC应答

日志追加可能会因为,follower和leader之间日志不匹配而返回false,也可能因为leader的term太小而返回false。所以leader要一一处理这种情况,在函数handleAppendRPC中完成。

在处理RPC应答中,首先要考虑的一个情况就是,处理延迟到达的RPC。老样子,如果当前term已经大于发出去的rpc term,说明这个rpc应答是之前term的,直接不处理。

RPC应答主要有三种情况要处理:1. 成功, 2. 日志不匹配而失败 3. term太小而失败。下面分别介绍三种情况。注意,在处理rpc应答的时候,也只有leader的时候才行,而且对于延迟到达的term直接不响应。这里要考虑的情况就是,leader因为某些原因变成了follower,那么它所有的线程肯定不能再发送报文了,应该直接停止。

此外,发送日志的情况也是并发的,所以其中涉及到对于leader状态的改变也需要加锁。

成功

如果某个节点成功接收log entry,那么我们就要更新leader中的nextIndexmatchIndex。并且还要判断,目前的log entry是否在大多数节点中被接受了,如果是的话,就要递增自己的commitIndex。并且将数据写入到applyCh中。

那么如何知道哪些数据被正确接收了呢。答案就是对matchIndex进行排序,取中点 + 1的日志就是被正确接收到的。比如说matchIndex=[2,2,2,1,1] ,显然index = 2的log entry是被大部分节点接收了的。在并发的情况中,可能会出现nextIndex=[5,3,2,1,1]这样的情况,不过我们还是可以断定2是被接收的了,因为其他节点在2之后的log entry都被接收了。

注意点: 论文中figure 8说明了leader只能commit自己term的日志,而不能commit之前term的日志。只有在自己term内提交了日志的时候顺带将之前Term的日志提交了。

To eliminate problems like the one in Figure 8, Raft never commits log entries from previous terms by counting replicas. Only log entries from the leader’s current term are committed by counting replicas; once an entry from the current term has been committed in this way, then all prior entries are committed indirectly because of the Log Matching Property ------- Raft page 9

所以,在正式commit之前,我们要判断下此时可以commit的log entry是不是自己的term的

if rf.matchLog(newCommitIndex,rf.currentTerm) { // figure 8的要求
    var origin = rf.commitIndex + 1
    for i := origin; i <= newCommitIndex; i++ {
        applyMsg := ApplyMsg{
            CommandValid: true,
            Command: rf.log[i].Command,
            CommandIndex: rf.log[i].Index,
        }
        rf.applyCh <- applyMsg
    }
    rf.commitIndex = newCommitIndex
}

接下来就是循环地将log entry 写入到applyCh中,测试脚本需要找个applyCh来判断,日志是否正确的被接受。

这里之所以也要循环的写入得到applyCh,是因为有时候commitIndex涨的很快,在并发的同步日志的时候就会出现这个情况。

日志不匹配而失败

如果leader发送日志给target节点失败,那么leader就递减这个节点的nextIndex,然后重新发送log entry。

After a rejection, the leader decrements nextIndex and retries the AppendEntries RPC. Eventually nextIndex will reach a point where the leader and follower logs match. When this happens, AppendEntries will succeed, which removes any conflicting entries in the follower’s log and appends entries from the leader’s log (if any).

这里仍然有优化空间,不过目前为了简单起见,暂时先不做这个。

done = false    //false表示rpc发送失败,重新发送
rf.nextIndex[target] = rf.nextIndex[target] - 1

Term太小:

这个很常规,没啥好说的。

else if reply.Term > rf.currentTerm {
    // term 落后,他不再是一个合适的leader
    // 所以结束发送 append rpc,并且
    DPrintf("{Node %v} is not a suitable leader anymore",rf.me)
    done = true
    rf.ChangeState(Follower)
    rf.currentTerm = reply.Term
    rf.votedFor = -1
}

选举的优化

到日志备份中,和选举相关的主要是Raft论文中的section 5.4.1种的内容。在选举的时候,只有leader它自身的日志是最新的才可以获得选票。因此在处理选举RPC的时候,follower还需要额外的去对比日志是否一致。

根据论文中的election restriction提到:

Raft uses the voting process to prevent a candidate from winning an election unless its log contains all committed entries. A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers. If the candidate’s log is at least as up-to-date as any other log in that majority (where “up-to-date” is defined precisely below), then it will hold all the committed entries.

也因此,在RequestVoteArgs中新加上两个成员变量用于选举的时候,follower来判断candidate的日志是不是比他要新。

type RequestVoteArgs struct {
    // Your data here (2A, 2B).
    Term         int
    CandidateId  int

    //2B
    LastLogIndex int    //candidate中最后一个log entry的index
    LastLogTerm  int    //candidate中最后一个log entry的term
}

在RequestVote的接收方中,要根据日志比较规则来决定是否给candidate投票,日志比较规则如下,完全地按照raft中的说明来实现的。

func (rf *Raft) isLogUpToDate(term int ,index int) bool {
    var lastEntry = rf.getLastEntry()
    return term > lastEntry.Term || (term == lastEntry.Term && index >= lastEntry.Index)
}

此外,这里有一个细节的地方就是,即使不给他投票,我们也要同步下term。我认为这样做主要有两个原因:

  1. 如果candidate不能获得选票,那么将来这些日志较新的follower会发起选举,进而自增自己的term。我们在选举中已经说过,只能给term大的节点选举,如果这里不进行同步,那么一个follower可能会引发一轮的选举
  2. 在返回的RPC应答中,设置reply.term == rf.currentTerm,可以帮助candidate判断是何种原因导致的选票获取失败。如果reply.term > rf.currentTerm,那么candidate知道是自己term太小。如果reply.term == rf.currentTerm,candidate可以知道这是因为日志不匹配,那么直接转为follower.

日志一致性校验

最开始通过几个测试用例比较简单,但是在TestRejoin这个测试点就会增加一些额外的难度。最开始的时候,还没有引入leader掉线的情况,所以不需要考虑日志一致性校验就可以通过测试点。

这里主要考虑的情形是:leader掉线,新的leader被选出,根据Raft论文的指示,我们将新leader的nextIndex初始化为自己的最后一个log entry的index + 1,将matchIndex设置为0。

When a leader first comes to power, it initializes all nextIndex values to the index just after the last one in its log (11 in Figure 7).

那么会带来的问题就是,此时nextIndex对应的log entry在follower中根本没有,所以根据日志一致性校验规则,这个RPC就是失败了。所以nextIndex--来重发报文。

func (rf *Raft) ChangeState(state NodeState) {
    if rf.state == state {
        return
    }
    DPrintf("{Node %d} changes state from %s to %s in term %d", rf.me, rf.state, state, rf.currentTerm)
    rf.state = state
    switch state {
    case Follower:
        rf.heartbeatTimer.Stop()
        rf.electionTimer.Reset(electionTimeout())
    case Candidate:
    case Leader:
        rf.electionTimer.Stop()
        rf.initIndexes()    //初始化nextIndex和matchIndex
        rf.heartbeatTimer.Reset(heartbeatTimeout())
    }
}

// 在handleAppendRPC中
else if reply.Term == rf.currentTerm{
        // 日志存在冲突,重新发送报文
        done = false
        rf.nextIndex[target] = rf.nextIndex[target] - 1
    }

递增CommitIndex

在这里,我先说一下我最开始踩的坑。最开始,我认为只要leader的commitIndex递增了,那么follower就可以递增commitIndex了。然而这是错的,需要考虑的一种情况就是:大部分节点已经接收到来自leader的log entry,在leader这边它的commitIndex是可以递增了。不过有些节点没有收到log entry(可能因为网络原因RPC来的比较慢,或者是日志一致性检查的时候前面的log entry没有追加上去),它们仍然接收到了leader的commitIndex(通过心跳)。那么就会出现,log entry没收到,不过commitIndex递增了,然后开始写入到applyCh中,数组越界了。

因此,在这里要严格的按照论文的说明:

If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) ----- Raft Figure 2

递增commitIndex,这里的流程很简单,就是想清楚,什么时候才可以递增。必然是leader收到大部分节点的确认应答后,自己先递增commitIndex,然后通过心跳发送给follower,让他们也递增commitIndex

在follower中,代码如下:

func (rf *Raft) incFollowerCommitIndex(leaderCommit int) {
    if leaderCommit > rf.commitIndex {
        var origin = rf.commitIndex + 1 
        //注意这里的min,我一开始错误的认为max
        rf.commitIndex = int(math.Min(float64(leaderCommit), float64(rf.getLastEntry().Index)))
        for i := origin; i <= rf.commitIndex; i++ {
            applyMsg := ApplyMsg{
                CommandValid: true,
                Command: rf.log[i].Command,
                CommandIndex: rf.log[i].Index,
            }
            rf.applyCh <- applyMsg
        }
    }
}

上面代码要用循环的来apply也是相同的考虑。就是有时候leaderComit突然暴涨,并发的情况下会出现,或者follower经过掉线重连的情况。

Summary

下面总结下在2b中遇到的问题以及我踩过的一些坑,以及目前代码可以优化的地方。

  1. 认真看注释

    最开始我不知道applyMsg怎么用,是因为我没看注释。只要将一个数据commit了,那么就要在leader和follower中都要写入到的applyCh中。

  2. 写入到apllyCh中要循环地写入

    最开始,我写入只是写入日志的最后一个log entry,这里我想得太简单了。我只考虑到只会一个一个地追加,没有考虑到并发的追加情况。

  3. 加锁的情况

    在部分加锁的情境中,我都是按照golang race检测加的。有些情况我也不好说为什么这里要加索。

  4. 老话重谈

    定时器的随机数种子一定要重设一下,不然随机数永远是一样的。

可以优化的地方

  1. 使用条件变量

    目前,这里的做法就是死循环地发送log entry。会导致RPC调用数非常高,即使加了time.sleep(),还是非常的多。在后续的2C中会带来的很大的Debug困难。

  2. 优化日志一致性检查的地方

    Raft论文提到了如何进一步加快一致性检查的过程。2C中需要这样做,2B暂时不要。所以就留到后面在做吧

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇