partB代码地址:[partB],参考了很多这位大佬的实现:这里
基本的过程
本节的主要内容就是在kv中再加上snapshot的内容。不是特别的难,只要按部就班的使用lab2中完成的几个raft快照功能就好。在lab的实验中,要考虑的两个主要问题就是:
- 什么时候去创建一个快照
- 在快照中要保存的东西有哪些
第一个问题:
测试脚本在创建一个kvserver的时候会传入一个maxraftstate
,如果此时目前所保存的数据超过了maxraftstate,那么就建立快照。这里的几个函数的是:
- 在raft中,需要持久化的时候,调用
persist()
函数来将数据保存到Persister
的raftState []byte
中,这里的数据包括voteFor,currentTerm,log。 - 如果所保存的数据超过了
maxraftState
,那么就要调用raft.Snapshot()函数来建立快照。 - 然后,在每一个节点的KVserver的applier中根据applyCh中收到的数据,如果是快照,就去调用
CondInstallSnapshot()
来判断这个快照是否是有效的,如果是有效的话,那么将这些数据复原。
第二个问题:
首先,lab页面中的一个提示已经说到了,kvserver在建立checkpoints的时候还是要能够检测哪些重复的请求,这里也就是暗示着,要将leader的prevOperation
也保证到到快照中,另外,还要将db的数据直接保存到快照中,然后再follower中,我们直接将替换db。虽然这在现实操作中肯定是不妥的。 所以总的来说,要保存的东西就是prevOperation
和db
。
代码的实现
在接收log entry的时候,我们每次都判断此时所保存的数据大小是否超过了maxraftstate。如果是的话,就建立快照,将prevOperation
和db
都直接保存到快照当中,然后调用Snapshot
将过期的log entry删除,接着底层的raft将snapshot写入到applyCh中。所以,在applier中判断,如果收到的是快照,在KVserver中我们调用CondInstallSnapshot
来判断这个快照能否被应用。如果可以的话,再去调用restoreSnapshot
读取快照中的内容。
if kv.needSnapshot() { // 判断是否需要建立快照
kv.takeSnapshot(message.CommandIndex)
}
func (kv *KVServer) takeSnapshot(index int) {
buffer := new(bytes.Buffer)
encoder := labgob.NewEncoder(buffer)
encoder.Encode(kv.db)
encoder.Encode(kv.prevOperation)
kv.rf.Snapshot(index,buffer.Bytes())
}
// 从快照中读取数据,并且将当前的db和prevOperation替换
func (kv *KVServer) restoreSnapshot(snapshot []byte) {
if snapshot == nil || len(snapshot) == 0 {
return
}
buffer := bytes.NewBuffer(snapshot)
decoder := labgob.NewDecoder(buffer)
var db kvDB
var prevOperation map[int64]*OpResultWrapper
if decoder.Decode(&db) != nil||
decoder.Decode(&prevOperation) != nil {
DPrintf("{Node %v} restore snapshot failed",kv.rf.Me())
}
kv.db = &db
kv.prevOperation = prevOperation
}
另外一个小问题就是,节点关机后重启,也应该马上读取此时快照中的内容,恢复之前的状态。在测试脚本中,用ShutdownServer
和StartServer
来模拟这些情况,所以在StartServer
,也应该调用restoreSnapshot
。
// 在startServer中
kv.db = MakeDB()
kv.resultChans = make(map[int]chan *OpResult)
kv.prevOperation = make(map[int64]*OpResultWrapper)
kv.restoreSnapshot(persister.ReadSnapshot())