MIT 6.824 Lab3 KV Raft (1)

在上一个lab中,将raft的基本的内容都完成了。接下来这节,就是以raft为基础,来实现一个kv的数据库。part A部分还是比较简单的,不需要处理快照。只需要处理在RPC调用中出现的问题,比如说rpc丢失,调用超时等情况。具体的就到后面的代码还会有说到。

在正式开始前,要了解论文中section 7和8的内容,不然会有些不知所以然。代码实现在:lab3 PartA

基本结构

在实验中,Clerk代表就是一个client,它在client.go,它来发起对于kv数据库的操作。我们的kv数据库支持的有三个操作:put,append,get;分别对应的操作是:往数据库中写入,追加,以及获取数据。测试脚本中会调用ck.Put()来执行插入操作,其他的也是相类似。

一开始拿到代码的时候,感觉无从下手,不知道从哪里开始写起。这里简要的介绍一下,实验中各个结构体的作用。对于Client来说,发起一个RPC请求,都要封装好自己的RPC参数,比如说Get有GetArgs,Put有PutArgs,这些在common.go中。在Client.go中的put,append,get函数都是给将来测试脚本都需要的,通过这几个函数client来发起一个rpc请求,所以我们要完成这个几个函数的实现。

在Server.go中是kv数据库的rpc接收方,用来真正的完成对于kv的插入以及查询数据,因此我们还要自己实现一个简单的kv数据库,这里就是用golang自带的map来模拟一下就好了。主要就是两个函数来完成这一工作:

// 下面两个函数用于处理客户端发送过来的请求
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    // Your code here.
}

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    // Your code here.
}

// 用来描述一个kv操作,比如说是append还是get?
type Op struct {
    // Your definitions here.
    // Field names must start with capital letters,
    // otherwise RPC will break.
}

在server.go中,Op这个结构体就是用来准确地描述一次kv操作。实际的情况就是(以get为例): 接收GetArgs,然后将它的内容进一步的封装为Op这个结构体。然后将Op由Raft同步给其他节点,同步成功后,leader从自己的applyCh中读取写入的Op结构体,执行对应的操作,写回返回值。

Basic的基本思路

basic的实现比较简单,就是使用rpc调用和server之间通信,然后让server返回结果。这里主要考虑的几个问题就是:

  1. client一开始通信可能是和follwer,所以client如何找到正确的leader
  2. server如何获得kv操作的返回值

第一个问题论文中有提到,如果一开始client和follwer进行通信,那么follower可以直接重定向给leader。或者是,follwer返回leaderId,一般来说,它可以知道集群中leader的Id(正常情况下,voteFor就是leader的id)。不过,在lab中为了实现简单,我直接让follower返回一个信息,让client重试rpc调用,简化了实现。

The leader handles all client requests (if a client contacts a follower, the follower redirects it to the leader).

第二个问题是,server收到rpc之后,调用Start()将日志同步给其他节点,Start()的返回值中有新追加的log 的index。所以,我们可以创建一个map[index]channel,然后index -> channel映射起来。对于kv数据库的操作成功后,将返回结果写入到channel中。使用select让server 阻塞地等待返回结果,显然无限的阻塞是不妥的,后面还需要实现超时取消调用的过程

hint中提示,我们将Get也封装为日志,同步到集群中的其他节点,这一点虽然和论文中的read-only 操作背道而驰,不过这里也是为了简单起见,所以我们不需要特别处理。这样一来,就保证了不会读取到过期的数据,虽然在实际的一些raft实现中这样做会显著地增加很多日志,浪费了硬盘空间。

Basic的实现(Client)

lab原来的代码给出了GetArgs,PutArgs,Op等结构体,在server处理中也会出现重复代码。有些繁琐,我就自己封装了一个新的,如下:

// 描述一个请求是什么,是put 还是get 还是append?
type OpArgs struct {
    // Your definitions here.
    // Field names must start with capital letters,
    // otherwise RPC will break.
    OperationType string    // append get or put ?
    Key string          // put和append的时候需要
    Value string        // 插入的值,put和append需要
    SequenceNum int     // 序列号,用来标识一个请求
    ClientId int64      // 发送这个请求的clientId
}

client的结构如下:

type Clerk struct {
    servers []*labrpc.ClientEnd
    // You will have to modify this struct.
    leaderId int64  // client只能和leader进行交互
    clientId int64  // 
    sequenceNum int // 该client要发送的请求的序列号
}

为了在面对rpc丢失,或者是对方返回的内容丢失等情况,发送RPC调用的时候应该是持续性的,直到发送成功。如果不是Leader,这里为了简单起见,就换到下一个节点,然后重试RPC请求。RPC调用成功后,就递增序列号,注意的是,如果rpc调用不成功,那么序列号不能递增,将来后续的测试点中会模拟RPC丢失的情况,后面再谈

// 调用KVServer的HandleRequest来处理rpc调用
var ok = ck.servers[ck.leaderId].Call("KVServer.HandleRequest",args,reply)
if !ok || reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {
    ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers))
    //DPrintf("client %d changes to another leader:%d",ck.clientId,ck.leaderId)
    //time.Sleep(time.Millisecond * 500)
    continue
}
ck.sequenceNum++

Client这边的代码逻辑比较简单。接下来看下server的实现。

Basic的实现(Server)

就basic而言,server端所需要的就是接收kv操作后的返回值。我使用了resultChans map[int]chan *OpResult来记录每个kv操作的返回值。kv操作的返回值会写入到当前log entry为key所对应的Channel当中。然后再server中使用select来阻塞地等待返回值。

另外一个就是,server在调用Start之后,如果发现自己不是leader就直接返回,不进行后续的操作。

    kv.mu.Lock()
    index,_, isLeader:= kv.rf.Start(*args)
    //DPrintf("{KV %d} receives OpArgs:%v isLeader:%t",kv.me,args,isLeader)
    if !isLeader {
        reply.Err = ErrWrongLeader
        kv.mu.Unlock()
        return
    }
    // 创建channel的时候要加锁,因为, 要保证channel的创建早于applier中
    // applyLogToStateMachine.这里加锁,就避免了先执行command,再创建channel
    resChan := kv.getResultChan(index)
    kv.mu.Unlock()

Start之后,要等待所有的日志都被正确的同步,才会被写入到server的applyCh。前面select已经把server堵住了,所以必然需要重新开一个协程来读取日志。applyCh中要判断是log还是snapshot,因为在lab2中,往applyCh中既可以写入snashot,也可以写入log。

func (kv *KVServer) applier() {
    for kv.killed() == false {
        select {
        case message := <-kv.applyCh:
            if message.CommandValid {
                kv.mu.Lock()
                // 如果是log
                .... 略
                // 根据log index获取对应的channel,然后写入返回值
                if currentTerm, isLeader := kv.rf.GetState(); isLeader && 
                message.CommandTerm == currentTerm {
                    ch := kv.getResultChan(message.CommandIndex)
                    ch <- response
                }
                kv.mu.Unlock()
            } else if message.SnapshotValid {
                // 如果是snapshot
            } else {
                panic(fmt.Sprintf("unexpected Message %v", message))
            }
        }
    }
}

另外要考虑的是,Raft中和client打交道的是leader,所以往输出结果channel中写入返回值的只有leader才可以,所以在写入之前要判断节点状态是不是为leader。

一个加锁的细节:

如果在applier()中不加锁,那么可能会发生存放结果的channel在HandleRequest中还没有创建好,那么就无法正常地将结果写入。所以,在HandleRequestapplier都要持有kvserver的锁。那么只有释放锁以后,才可以将applyCh中的log应用到kv数据库中,然后写入到channel。

网络异常

在basic中,序列号并不是一个非常重要的字段。不过在网络不可靠的情况下,序列号就比较重要了。在网络不可靠的情况下,如果leader已经将一个put log成功地同步给了其他节点。不过返回给客户端的应答丢失了,这种情况下,一般来说客户端会设置一个自己的超时定时器,如果超时了就重新发送请求。TestUnreliableOneKey3A需要重复处理请求的情况。

要实现每一个操作只能执行一次,这也就是论文中说的线性化语义的意思。

Our goal for Raft is to implement linearizable semantics (each operation appears to execute instantaneously, exactly once, at some point between its invocation and its response).

问题就是,两个一样的请求所带来的负面影响,比如说append(1,13) ,key = 1追加了两个value = 13。所以,在server中,我们要避免这个问题,关键就是使用序列号 ,如果收到相同序列号的请求,那么直接返回上次操作的结果,Raft论文中也提到了这一点。

The solution is for clients to assign unique serial numbers to every command. Then, the state machine tracks the latest serial number processed for each client, along with the associated response. If it receives a command whose serial number has already been executed, it responds immediately without re-executing the request

在这里,我们也相类似,在KVServer中增加一个prevOperation map[int64]*OpResultWrapper变量,以每个clientId作为key,保存这个Client最近发送的请求OpResultWrapper是对返回结果的进一步封装,其中包括着最近一个请求的序列号。如下代码来判断是否为重复请求:

func (kv *KVServer) isDuplicateReq(clientId int64, sequenceNum int ) bool {
    if value,ok := kv.prevOperation[clientId]; ok {
        if value.prevSequenceNum >= sequenceNum {
            return true
        }
    }
    return false
}

HandleRequest中,接收一个请求前,先判断这个请求是否为重复请求,如果是重复请求那么就直接返回之前的结果。这里要考虑的就是,Get请求本身就是幂等的,所以即使是重复的请求也是没关系的。虽然,没有这个条件也没关系,还是可以通过测试用例的。

    if args.OperationType != OpGet && kv.isDuplicateReq(args.ClientId,args.SequenceNum) {
        prevResult := kv.prevOperation[args.ClientId].opResult
        reply.Value = prevResult.data
        reply.Err = prevResult.err
        kv.mu.Unlock()
        return
    }

目前的操作只是为了处理重复的请求,还不能很好的处理网络出现分区错误的情况。考虑一个场景,集群中leader和部分follower发生了分区错误。然后剩下的大部分节点重新选了一个leader出来。原来的client和老的leader进行rpc调用,迟迟不能返回,即使可能经过一段时间以后,分区错误已经不存在了,client还是会被阻塞地堵在了leader的select语句中。因此,为了解决这个问题,我们要在select中加入超时直接返回,在client中,如果收到的返回结果的错误信息是超时,那么直接换一个节点重新发送RPC调用。

case <- time.After(ExecuteTimeout):
    //如果发生了分区错误,那么client提交的请求会陷入无限阻塞,因为
    //raft无法同步log,所以这里加入超时来让client重新选择合适的leader
    // 如果没有超时处理,在发生分区错误的时候,client无法重新选择合适的leader
    // 会陷入rpc调用(Call函数)的持续阻塞.那么将来分区合并后,阻塞的client也不能继续
    DPrintf("{Kv %d} timeout for args:%v",kv.me,args)
    reply.Err = ErrTimeout  //返回超时错误
}

TestOnePartition3A测试点中,如果前面因为分区的put或者get请求一直无法完成,会输出错误信息:Put did not complete。经过这样简单的处理就可以通过这个测试点。

不过对于后面的TestManyPartitionOneClient3A等测试点还是不够的,我不能很好的通过日志来观察出如何去通过这些测试点,参考了一些别人的代码来通过测试点。代码如下:

if command.OperationType != OpGet &&
kv.isDuplicateReq(command.ClientId,command.SequenceNum) {
    response = kv.prevOperation[command.ClientId].opResult
} else {
    response = kv.applyLogToStateMachine(command)
    kv.prevOperation[command.ClientId] =
    &OpResultWrapper{command.SequenceNum, response}
}

前面的测试点并不需要像这样做,这里要考虑的就是,在HandleRequestkv.isDuplicateReq()之前已经接受到了重复请求。所以在applier中,我们在返回结果之前先判断一下是否是重复请求,是的话直接返回。不是的话再去应用到底层的kv map中。比如说Start()之后,经过了490ms这个日志才到达applyCh中,然后没过多久就因为超时直接返回了。接着又重传,这次比较幸运,很快就到达了。不过还没有被prevOperation记录下来,所以直接通过了isDuplicateReq的校验。然后再applyCh中重复接收了,因此我们在applier中也进行一次判断。

Gob的小问题

Gob这个包序列化数据的时候,需要知道具体的类型,才可以将它正确的反序列化回来。我们在Start()中传入的参数是interface{}类型,所以如果不进行一些预先的安排,那么在follower中不能将OpArgs反序列出来。所以在StartKVServer()中需要调用gob.Register来注册一下。

Summary

3A的话总体来说不怎么难,不过我在做的时候花了很多时间去理解测试用例的意思了。还有就是lab原来的代码可以和我一样自己来优化一下,这样就可以省去很多冗余的代码。

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇