rf.mu.Lock()
rf.currentTerm += 1
rf.state = Candidate
rf.mu.Unlock()
rule 3:当某个go程的一系列读写操作中间被别人修改会出错时,整个部分都需要加锁临界区保护,同时注意currentTerm变量其他go程使用需要同一的锁做保护,某些rpc handle更是需要全过程加锁保护
rf.mu.Lock()
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
}
rf.mu.Unlock()
rule 4:加锁的临界区不应该有耗时的阻塞操作,比如读阻塞信道,写信道,等待定时器,sleep,或者发送同步rpc。原因有二,第一是减少了并发度,此时其他go程可以继续持锁干活;第二是防止死锁,peer rpc互相持锁发送rpc依赖对方的锁,会造成死锁;rpc场景如果确实需要持锁,那么可以另起其他go程去做rpc操作,主go程持锁做其他事,之间通过信道通信
rule 5:小心释放锁重新获得锁之后的状态,一般这么做是为了提高性能,防止等待,但是需要确保重新获取锁之后,状态和之前释放锁的状态是一致的;下述例子,有两个错误,第一rf.currentTerm需要显式传入go程,copy一份,否则go程执行的时候该变量已经被修改了;第二,go程重新获取锁后,需要检查rf.currentTerm是否还和传入的参数一致,一致才能继续操作,不一致则说明条件已经不满足
rf.mu.Lock()
rf.currentTerm += 1
rf.state = Candidate
for {
go func() {
rf.mu.Lock()
args.Term = rf.currentTerm
rf.mu.Unlock()
Call("Raft.RequestVote", &args, ...)
// handle the reply...
} ()
}
rf.mu.Unlock()
总体来说,满足上述条件的代码开发颇具挑战,具体来说决定临界区何时开始何时结束,哪一系列操作需要加锁是困难,同时并发调试也是不容易的,一个更具有实际意义的方法是,首先假设没有并发,也就不需要锁保护,但是为了发送rpc不阻塞,仍旧需要创建go程,那么可以在每个rpc handlers执行全过程加锁保护,go程开始执行最初获得锁,执行结束在释放锁,完全避免了并发,也就可以满足上述rule 1 2 3 5. 这样就避免从大量代码系列中寻找临界区的麻烦,rule 4依旧是个麻烦,所以下一步就是找到阻塞的位置,增加代码在阻塞之前释放锁,在阻塞之后得到锁后,小心检查此时的状态是否和阻塞之前一致,按照上述的步骤操作较为容易。上述方案的弊端显而易见,就是性能差一些,很多不需要锁的地方也被加锁保护了,换句话说,某一个Raft peer内部全部是串行的,无法发挥多核cpu的特性
Raft Structure Advice
raft示例主要处理外部事件,包括AppendEntry RequestVote Rpcs,Rpc replies and start calls,后台周期性任务,包括心跳和选举,实现上述所有功能的数据结构有很多,本文给出一些提示
每个raft实例的状态(log、current index)这些需要都需要在事件到达过程或者reply过程中并发的进行读写访问,go官方文档针对这种模式提供两种机制,共享数据+锁,信道通信,实践证明前者实现起来更直观一些
一些周期性的任务,比如主节点定时发送心跳,备节点在指定时间内没有收到主节点心跳,发起选举,这些周期性任务最好每个都独立的启动go程去执行,而不是用一个后台go程处理所有的事件
处理心跳超时重新发起选举是一个棘手问题,最简单的做法,结构中记录上一次主节点心跳的时间戳,超时go程周期性检查该变量是否超时,建议使用time.Sleep()参数使用一个小的常数,而不是使用time.Ticker 或者time.Timer,要想用对这两个需要一些技巧
另外一个单独的go程来提交log entry到applyChan,务必保证是一个独立go程,因为发送信道会导致阻塞,另外务必是一个单线程模式,防止order乱序,建议增加ommitIndex后使用使用sync.Cond来c唤醒applyLog GO程
每个RPC请求的接收和发送返回需要独立Go程,原因有二,第一某一个不可达的peer不会阻塞大部分的回复过程,第二使得心跳和超时选举可以不被阻塞一致正常进行;简单的实现rpc reply过程使用同一个go程,而不是通过信道发送消息
注意,网络会延迟rpc请求的发送和返回,当并发发送rpc请求时,发送顺序和返回都是乱序的,rpc handler需要忽略那些old term的消息;主节点尤为要注意,处理消息返回时,要注意term假设依旧没变,同时需要注意处理并发rpc的返回时,修改leader的状态
Students’ Guide to Raft
代码结构
主要开发集中在raft.go
Make接口,创建一个Raft peer,传入所有peer网络标识符,index,
Start(command)通知raft系统执行append command到replica log,start应当立即返回,发送ApplyMsg消息将新的commit log entry发送到applyChan
Raft peers之间通信通过labrpc,基于GO原生rpc库,使用channel代替网络socket,raft.go中包括一些实例rpc,比如sendRequestVode,RequestVote
labrpc中会对rpc做一些错误注入,延迟,乱序以及删除来代表网络异常,不允许修改labrpc库
to be continue…
labgob包装gob库,做了两点warning,第一是发送不能导出的结构体成员会warn,因为gob默认会忽略没有导出的成员;第二,如果decode传入的结构体包括一些不是默认值的变量,warn,因为decode不会覆盖这些非默认值。简单来说,都是容易出错且很难排查的场景,因此就在这里提前预警。
TODO register的含义,设计go的反射用法
基于channel实现rpc,用来注入网络的故障,比如延时,丢包,乱序,网络隔离等等错误,借鉴net/rpc/server.go实现。
TODO 实现思路学习,涉及go的很多基础案例和用法
实现选举和心跳过程,2A主要考察选举过程,第一,没有失效的时候稳定的有一个主节点,第二,当主节点失效,可以选举出新的主节点
Hints
通过figure 2确定需要存储的状态state,存储在raft结构中,同时定义rpc请求消息和返回消息 实现投票RPC和同步logRPC Make方法中,实现后台定时超时触发选举流程,使用随机数保证不发生分票的情况 Test需要,保证心跳不超过10次/s,即心跳发生周期大于100ms test需要,保证在5s内选举出新的leader,因此timeout超时时间要设置的小一些 Section5.2中,配置超时时间150ms到300ms,这需要心跳至少发生2次在150ms之内,即心跳周期小于75ms,但是test要求周期大于100ms,因此实验中的timeout要大于paper中的配置,但是不能太大,需要自己尝试找到合适的配置 尽量使用time.Sleep,Timer和Ticker容易出错 尽可能读懂Figure 2 使用Debug print进行错误排查 Part 2B实现leader和follower的log同步过程,通过心跳rpc,并可以达成共识;实现Start方法,供客户端使用来发送命令到raft,实现commit过程,并将commit的log entry返回给client进行状态机执行
Hints
实现选举限制条件,section5.4.1,只有up-to-date log的节点才能被投票 注意,当有主节点稳定存在时,不应该发生切主,即小心处理投票rpc,在有稳定主节点时,需要忽略该消息;成为主节点后,要立即发送心跳,同时主要接受心跳过程中对超时timer需要重置,来防止新的选举发生 代码经常需要等待某个事件发生,不要轮询,可以使用sync.Cond或者channel,最简单可以用无限循环+sleep实现轮询 尽可能的让raft的代码,简洁易懂,后面的实验仍旧需要基于raft构建,请尽可能保证简单性和可理解性 Part 2C2C主要考察某个raft节点reboot,重新加入集群后仍旧可以保证一致性,这需要持久化raft中的一些关键状态,并在状态发生变更的过程中随时保证关键状态的持久化。真正实现持久化需要保证每一次状态变更都落盘,这样重启后从磁盘中获取到最新的状态。本次实验不用disk,使用内存来模拟。
首先实现persist()和readPersist()方法,这里使用labgod;然后决定何时执行persist方法,即寻找状态变更需要落盘的代码段
Hints
2C测试中有很多节点挂掉,网络中断的场景 实现文中提到的优化,如果追随者的log和主节点log不一致,要直接返回到该term不一致的最初index 2A 2B 2C需要在4分钟内完成,CPU time 1分钟 状态和状态迁移梳理type LogEntry struct {
LogIndex int
LogTerm int
Command interface{}
}
主要参考Figure 2:
leaderId int // leader's id, initialized to -1
currentTerm int // latest term server has seen, initialized to 0
votedFor int // candidate that received vote in current term, initialized to -1
commitIndex int // index of highest log entry known to be committed, initialized to 0
lastApplied int // index of highest log entry known to be applied to state machine, initialized to 0
state serverState // state of server
status serverStatus // live or dead
log []LogEntry // log entries len()=5 (0,1,2,3,4 0 is no use) then logIndex=5
logIndex int // index of next log entry to be stored, initialized to 1
nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry, used to track committed index
applyCh chan ApplyMsg // apply to client
notifyCh chan struct{} // notify to apply
timeOutElectionTimer *time.Timer // timer used for timeout for election
// 其中需要持久化的是currentTerm,log[],votedFor
// 上述log[] 0index,不使用,从1开始
// logIndex用来记录下一条log的Index,这里单独一个变量而不是直接append主要是为了方便,实际上上figure中提高的last log index + 1
// nextIndex记录主节点要发送给从每个节点的index,初始化为logIndex即可
// nextIndex初始化为0,代表没有match的index,变量实际含义就是该server已经复制到的index
// commitIndex和lastApplied有点混淆,前者表示leader已经明确复制了多数节点,后者表示已经应用到状态机,即返回给了应用层apply
接下来考虑节点状态迁移:leader,follower,candidate
figure 4给出了迁移图和迁移条件
// caller must hold mu
// reference: Figure 4
func (rf *Raft) stateTransitionWithLock(targetState serverState) {
illegalStateLeader:= false
switch rf.state {
case Leader:
// detect higher term
if targetState == Follower {
rf.state = targetState
} else {
illegalStateLeader = true
}
case Follower:
// election timeout
if targetState == Candidate {
rf.state = targetState
} else {
illegalStateLeader = true
}
case Candidate:
// the term election timeout again, Candidate->Candidate
// the term election success, Candidate->Leader
// the term election fail(find a legal leader) or detect higher term, Candidate->Follower
rf.state = targetState
default:
illegalStateLeader = true
}
if illegalStateLeader{
log.Fatal("server state invalid")
}
}
考虑正常运行的raft实例状态可能发生迁移的时机:
1、election timeout,(timeout again when election)=> candidate
2、receive rpc reply with higher term => follower
3、receive rpc requset and find a legal leader =>follower
4、send VoteRpc gain majority votes => leader
trans位置:
1、timeout到达,follower(candidate 上一轮timeout) => candidate
2、选举得到多数票,candidate => leader
3、发送VoteRpc请求,response检测到高term,*=>follower
4、发送AppendRpc请求,response检测到高term,*=>follower
5、接收VoteRpc,检测到高term,*=>follower
6、接收AppendRpc,检测到高term,*=>follower
7、接收AppendRpc,检测到同term主,*=>follower
timeout timer reset触发的位置:
1、触发选举的同时,reset,保证本轮在timeout没有选举出来的场景,开启下一轮选举
2、接收VoteRpc,投票给别人,reset,本轮自己不需要触发选举
3、接收VoteRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时
4、接收AppendRpc,发现合法leader,reset,存在合法主节点,不需要触发选举
5、接收AppendRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时
6、发送VoteRpc请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时
7、发送Append请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时
replicaLogs触发位置:
选举成为leader之后开始后台同步log
VoteRPC
RPC实现:
rf.currentTerm == args.Term && rf.votedFor == args.CandidateId 返回投票,resetTimer rf.currentTerm > args.Term 或 本轮已经投票但不是这位候选,不投票 检测到高term,rf.currentTerm, rf.votedFor = args.Term, -1,持久化,状态到follower,resetTimer 比较log,term大优先;term一致长的优先,不满足不投票 满足要求,rf.votedFor = args.CandidateId,resetTimer,持久化,然后投票SendVote实现startElection:
确认状态不是leader继续 状态迁移到Candidate,term+1,self vote,异步发起VoteRPC,false重试;检测到高term结束本轮选举;其他场景收集成功的多数票后,!!!注意此时检测当前term是否还是args.Term!!因为允许多轮选举并发,此处要做判断,多数票是否为本轮选举产生的,状态迁移到leader,持久化,初始化leader数据结构,开始后台replicaLog 接收返回的处理,检测到高term,迁移到Follower,resetTimer,持久化后结束本轮选举;(注意发请求之前检测状态是否为Candidate,因为并发,不做也可以,做的话可以快速结束本轮选举) AppendLogRPCRPC实现:
rf.currentTerm > args.Term,返回失败,不合法的leader else 合法leader,首先resetTimer,有leader不触发选举 检测到高term,触发状态转移到follower,resetTimer,持久化流程 term一致,状态迁移到follower,主要是为了Candidate到follower 判断如何接收log,如果不匹配(方法是是否有prevLogIndex,且term一致),返回conflict index,不匹配term最初的一条logIndex,返回失败 如果匹配,将发来的log全部接收(是否需要更新,需要更新的话,更新多少,删除不匹配的部分),更新commitIndex,触发客户端notifyApplyleader bgReplicateLog实现:
定时发送,使用sleep即可完成,重试线程和主线程,注意检查state不是主是退出即可 失败重试,有个小细节,重试的时候仅发送空消息体,logs等待下次周期到达在发送 发送请求,检测状态是否为主?可加可不加 构造请求,处理是否为空的条件 处理返回,如果失败,两种情况,高term检测,触发状态转移;否则说明log不一致,更新nextIndex为conflictIndex,注意判断不要小于1,且不要超过logIndex,保证nextIndex合法 处理返回,如果成功,通过matchIndex判断是否commit,满足commit条件修改commitIndex,触发notifyApply notice,返回可能出现乱序,更新nextIndex和matchIndex小心不要回退 StartCommand 检查是否为主节点 追加log,修改logIndex,持久化后返回成功 bgApply notifyApply判断是否满足apply条件,astApplied < logIndex && lastApplied < commitIndex,满足发送信道;触发条件,leader commit和follower接收了log 将commit为apply的entry按照顺序发送到applyCh reference 2020-lab2 raft lock advice raft structure advice improved paxos paper raft-draw raft.io