[8][lab] lab2: raft impl

Julia ·
更新时间:2024-11-10
· 564 次阅读

lab 2 raft 本节作为实现ft KV store的基础部分,实现raft状态机复制协议,lab3基于lab2的raft模块,构建KV service,lab4基于上述构建shared KV service 一般来说,容错通过复制集实现状态的复制,保证在少数节点故障的场景下服务依旧可用,挑战是数据的一致性 Raft控制一个服务的状态复制,保证故障后的一致性,保证所有operator log按照相同的顺序在所有复制集节点上执行,保证所有节点对log的内容达成一致性共识。当故障节点重新上线时,raft采用一定的策略保证它慢慢到达最新的一致性状态,Raft依赖主节点同步log,当没有主节点时,raft不会同步log,而是开始新一轮的选举 本节实现Raft为模块独立,每个Raft实例之间通过rpc调用来维持复制状态机,log entries with index numbers,每条entry写来index,最终达成一致的情况下commit,此时raft模块会将结果返回上层服务来执行(仅允许rpc调用,不允许共享变量,不允许依赖共享存储文件等等) 主要参考raft paper来做实现,同时参考reference 更深入理解一致性,可以参考paxos、chubby、Paxos Made Live、Spanner、Zookeeper、Harp… 本节会实现raft论文中大部分操作,但不是全部,包括持久化日志,重启,但不会实现集群身份转变和快照以及日志压缩功能,section 6 and 7 lock advice rule 1: 多个go程同时访问的数据结构需要加锁保护,go自带的race检测可以很好的发现这个问题 rule 2: 类似事务的概念,一连串修改如果要保证同时生效而不是部分可见,需要一起锁住,mu保护两个状态,无论要使用哪个变量都需要获取该锁 rf.mu.Lock() rf.currentTerm += 1 rf.state = Candidate rf.mu.Unlock() rule 3:当某个go程的一系列读写操作中间被别人修改会出错时,整个部分都需要加锁临界区保护,同时注意currentTerm变量其他go程使用需要同一的锁做保护,某些rpc handle更是需要全过程加锁保护 rf.mu.Lock() if args.Term > rf.currentTerm { rf.currentTerm = args.Term } rf.mu.Unlock() rule 4:加锁的临界区不应该有耗时的阻塞操作,比如读阻塞信道,写信道,等待定时器,sleep,或者发送同步rpc。原因有二,第一是减少了并发度,此时其他go程可以继续持锁干活;第二是防止死锁,peer rpc互相持锁发送rpc依赖对方的锁,会造成死锁;rpc场景如果确实需要持锁,那么可以另起其他go程去做rpc操作,主go程持锁做其他事,之间通过信道通信 rule 5:小心释放锁重新获得锁之后的状态,一般这么做是为了提高性能,防止等待,但是需要确保重新获取锁之后,状态和之前释放锁的状态是一致的;下述例子,有两个错误,第一rf.currentTerm需要显式传入go程,copy一份,否则go程执行的时候该变量已经被修改了;第二,go程重新获取锁后,需要检查rf.currentTerm是否还和传入的参数一致,一致才能继续操作,不一致则说明条件已经不满足 rf.mu.Lock() rf.currentTerm += 1 rf.state = Candidate for { go func() { rf.mu.Lock() args.Term = rf.currentTerm rf.mu.Unlock() Call("Raft.RequestVote", &args, ...) // handle the reply... } () } rf.mu.Unlock() 总体来说,满足上述条件的代码开发颇具挑战,具体来说决定临界区何时开始何时结束,哪一系列操作需要加锁是困难,同时并发调试也是不容易的,一个更具有实际意义的方法是,首先假设没有并发,也就不需要锁保护,但是为了发送rpc不阻塞,仍旧需要创建go程,那么可以在每个rpc handlers执行全过程加锁保护,go程开始执行最初获得锁,执行结束在释放锁,完全避免了并发,也就可以满足上述rule 1 2 3 5. 这样就避免从大量代码系列中寻找临界区的麻烦,rule 4依旧是个麻烦,所以下一步就是找到阻塞的位置,增加代码在阻塞之前释放锁,在阻塞之后得到锁后,小心检查此时的状态是否和阻塞之前一致,按照上述的步骤操作较为容易。上述方案的弊端显而易见,就是性能差一些,很多不需要锁的地方也被加锁保护了,换句话说,某一个Raft peer内部全部是串行的,无法发挥多核cpu的特性 Raft Structure Advice raft示例主要处理外部事件,包括AppendEntry RequestVote Rpcs,Rpc replies and start calls,后台周期性任务,包括心跳和选举,实现上述所有功能的数据结构有很多,本文给出一些提示 每个raft实例的状态(log、current index)这些需要都需要在事件到达过程或者reply过程中并发的进行读写访问,go官方文档针对这种模式提供两种机制,共享数据+锁,信道通信,实践证明前者实现起来更直观一些 一些周期性的任务,比如主节点定时发送心跳,备节点在指定时间内没有收到主节点心跳,发起选举,这些周期性任务最好每个都独立的启动go程去执行,而不是用一个后台go程处理所有的事件 处理心跳超时重新发起选举是一个棘手问题,最简单的做法,结构中记录上一次主节点心跳的时间戳,超时go程周期性检查该变量是否超时,建议使用time.Sleep()参数使用一个小的常数,而不是使用time.Ticker 或者time.Timer,要想用对这两个需要一些技巧 另外一个单独的go程来提交log entry到applyChan,务必保证是一个独立go程,因为发送信道会导致阻塞,另外务必是一个单线程模式,防止order乱序,建议增加ommitIndex后使用使用sync.Cond来c唤醒applyLog GO程 每个RPC请求的接收和发送返回需要独立Go程,原因有二,第一某一个不可达的peer不会阻塞大部分的回复过程,第二使得心跳和超时选举可以不被阻塞一致正常进行;简单的实现rpc reply过程使用同一个go程,而不是通过信道发送消息 注意,网络会延迟rpc请求的发送和返回,当并发发送rpc请求时,发送顺序和返回都是乱序的,rpc handler需要忽略那些old term的消息;主节点尤为要注意,处理消息返回时,要注意term假设依旧没变,同时需要注意处理并发rpc的返回时,修改leader的状态 Students’ Guide to Raft 代码结构 主要开发集中在raft.go Make接口,创建一个Raft peer,传入所有peer网络标识符,index, Start(command)通知raft系统执行append command到replica log,start应当立即返回,发送ApplyMsg消息将新的commit log entry发送到applyChan Raft peers之间通信通过labrpc,基于GO原生rpc库,使用channel代替网络socket,raft.go中包括一些实例rpc,比如sendRequestVode,RequestVote labrpc中会对rpc做一些错误注入,延迟,乱序以及删除来代表网络异常,不允许修改labrpc库

to be continue…

labgob

包装gob库,做了两点warning,第一是发送不能导出的结构体成员会warn,因为gob默认会忽略没有导出的成员;第二,如果decode传入的结构体包括一些不是默认值的变量,warn,因为decode不会覆盖这些非默认值。简单来说,都是容易出错且很难排查的场景,因此就在这里提前预警。
TODO register的含义,设计go的反射用法

labrpc

基于channel实现rpc,用来注入网络的故障,比如延时,丢包,乱序,网络隔离等等错误,借鉴net/rpc/server.go实现。
TODO 实现思路学习,涉及go的很多基础案例和用法

Part 2A

实现选举和心跳过程,2A主要考察选举过程,第一,没有失效的时候稳定的有一个主节点,第二,当主节点失效,可以选举出新的主节点

Hints

通过figure 2确定需要存储的状态state,存储在raft结构中,同时定义rpc请求消息和返回消息 实现投票RPC和同步logRPC Make方法中,实现后台定时超时触发选举流程,使用随机数保证不发生分票的情况 Test需要,保证心跳不超过10次/s,即心跳发生周期大于100ms test需要,保证在5s内选举出新的leader,因此timeout超时时间要设置的小一些 Section5.2中,配置超时时间150ms到300ms,这需要心跳至少发生2次在150ms之内,即心跳周期小于75ms,但是test要求周期大于100ms,因此实验中的timeout要大于paper中的配置,但是不能太大,需要自己尝试找到合适的配置 尽量使用time.Sleep,Timer和Ticker容易出错 尽可能读懂Figure 2 使用Debug print进行错误排查 Part 2B

实现leader和follower的log同步过程,通过心跳rpc,并可以达成共识;实现Start方法,供客户端使用来发送命令到raft,实现commit过程,并将commit的log entry返回给client进行状态机执行

Hints

实现选举限制条件,section5.4.1,只有up-to-date log的节点才能被投票 注意,当有主节点稳定存在时,不应该发生切主,即小心处理投票rpc,在有稳定主节点时,需要忽略该消息;成为主节点后,要立即发送心跳,同时主要接受心跳过程中对超时timer需要重置,来防止新的选举发生 代码经常需要等待某个事件发生,不要轮询,可以使用sync.Cond或者channel,最简单可以用无限循环+sleep实现轮询 尽可能的让raft的代码,简洁易懂,后面的实验仍旧需要基于raft构建,请尽可能保证简单性和可理解性 Part 2C

2C主要考察某个raft节点reboot,重新加入集群后仍旧可以保证一致性,这需要持久化raft中的一些关键状态,并在状态发生变更的过程中随时保证关键状态的持久化。真正实现持久化需要保证每一次状态变更都落盘,这样重启后从磁盘中获取到最新的状态。本次实验不用disk,使用内存来模拟。
首先实现persist()和readPersist()方法,这里使用labgod;然后决定何时执行persist方法,即寻找状态变更需要落盘的代码段

Hints

2C测试中有很多节点挂掉,网络中断的场景 实现文中提到的优化,如果追随者的log和主节点log不一致,要直接返回到该term不一致的最初index 2A 2B 2C需要在4分钟内完成,CPU time 1分钟 状态和状态迁移梳理 type LogEntry struct { LogIndex int LogTerm int Command interface{} } 主要参考Figure 2: leaderId int // leader's id, initialized to -1 currentTerm int // latest term server has seen, initialized to 0 votedFor int // candidate that received vote in current term, initialized to -1 commitIndex int // index of highest log entry known to be committed, initialized to 0 lastApplied int // index of highest log entry known to be applied to state machine, initialized to 0 state serverState // state of server status serverStatus // live or dead log []LogEntry // log entries len()=5 (0,1,2,3,4 0 is no use) then logIndex=5 logIndex int // index of next log entry to be stored, initialized to 1 nextIndex []int // for each server, index of the next log entry to send to that server matchIndex []int // for each server, index of highest log entry, used to track committed index applyCh chan ApplyMsg // apply to client notifyCh chan struct{} // notify to apply timeOutElectionTimer *time.Timer // timer used for timeout for election // 其中需要持久化的是currentTerm,log[],votedFor // 上述log[] 0index,不使用,从1开始 // logIndex用来记录下一条log的Index,这里单独一个变量而不是直接append主要是为了方便,实际上上figure中提高的last log index + 1 // nextIndex记录主节点要发送给从每个节点的index,初始化为logIndex即可 // nextIndex初始化为0,代表没有match的index,变量实际含义就是该server已经复制到的index // commitIndex和lastApplied有点混淆,前者表示leader已经明确复制了多数节点,后者表示已经应用到状态机,即返回给了应用层apply 接下来考虑节点状态迁移:leader,follower,candidate figure 4给出了迁移图和迁移条件 // caller must hold mu // reference: Figure 4 func (rf *Raft) stateTransitionWithLock(targetState serverState) { illegalStateLeader:= false switch rf.state { case Leader: // detect higher term if targetState == Follower { rf.state = targetState } else { illegalStateLeader = true } case Follower: // election timeout if targetState == Candidate { rf.state = targetState } else { illegalStateLeader = true } case Candidate: // the term election timeout again, Candidate->Candidate // the term election success, Candidate->Leader // the term election fail(find a legal leader) or detect higher term, Candidate->Follower rf.state = targetState default: illegalStateLeader = true } if illegalStateLeader{ log.Fatal("server state invalid") } } 考虑正常运行的raft实例状态可能发生迁移的时机: 1、election timeout,(timeout again when election)=> candidate 2、receive rpc reply with higher term => follower 3、receive rpc requset and find a legal leader =>follower 4、send VoteRpc gain majority votes => leader trans位置: 1、timeout到达,follower(candidate 上一轮timeout) => candidate 2、选举得到多数票,candidate => leader 3、发送VoteRpc请求,response检测到高term,*=>follower 4、发送AppendRpc请求,response检测到高term,*=>follower 5、接收VoteRpc,检测到高term,*=>follower 6、接收AppendRpc,检测到高term,*=>follower 7、接收AppendRpc,检测到同term主,*=>follower timeout timer reset触发的位置: 1、触发选举的同时,reset,保证本轮在timeout没有选举出来的场景,开启下一轮选举 2、接收VoteRpc,投票给别人,reset,本轮自己不需要触发选举 3、接收VoteRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时 4、接收AppendRpc,发现合法leader,reset,存在合法主节点,不需要触发选举 5、接收AppendRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时 6、发送VoteRpc请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时 7、发送Append请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时 replicaLogs触发位置: 选举成为leader之后开始后台同步log VoteRPC

RPC实现:

rf.currentTerm == args.Term && rf.votedFor == args.CandidateId 返回投票,resetTimer rf.currentTerm > args.Term 或 本轮已经投票但不是这位候选,不投票 检测到高term,rf.currentTerm, rf.votedFor = args.Term, -1,持久化,状态到follower,resetTimer 比较log,term大优先;term一致长的优先,不满足不投票 满足要求,rf.votedFor = args.CandidateId,resetTimer,持久化,然后投票

SendVote实现startElection:

确认状态不是leader继续 状态迁移到Candidate,term+1,self vote,异步发起VoteRPC,false重试;检测到高term结束本轮选举;其他场景收集成功的多数票后,!!!注意此时检测当前term是否还是args.Term!!因为允许多轮选举并发,此处要做判断,多数票是否为本轮选举产生的,状态迁移到leader,持久化,初始化leader数据结构,开始后台replicaLog 接收返回的处理,检测到高term,迁移到Follower,resetTimer,持久化后结束本轮选举;(注意发请求之前检测状态是否为Candidate,因为并发,不做也可以,做的话可以快速结束本轮选举) AppendLogRPC

RPC实现:

rf.currentTerm > args.Term,返回失败,不合法的leader else 合法leader,首先resetTimer,有leader不触发选举 检测到高term,触发状态转移到follower,resetTimer,持久化流程 term一致,状态迁移到follower,主要是为了Candidate到follower 判断如何接收log,如果不匹配(方法是是否有prevLogIndex,且term一致),返回conflict index,不匹配term最初的一条logIndex,返回失败 如果匹配,将发来的log全部接收(是否需要更新,需要更新的话,更新多少,删除不匹配的部分),更新commitIndex,触发客户端notifyApply

leader bgReplicateLog实现:

定时发送,使用sleep即可完成,重试线程和主线程,注意检查state不是主是退出即可 失败重试,有个小细节,重试的时候仅发送空消息体,logs等待下次周期到达在发送 发送请求,检测状态是否为主?可加可不加 构造请求,处理是否为空的条件 处理返回,如果失败,两种情况,高term检测,触发状态转移;否则说明log不一致,更新nextIndex为conflictIndex,注意判断不要小于1,且不要超过logIndex,保证nextIndex合法 处理返回,如果成功,通过matchIndex判断是否commit,满足commit条件修改commitIndex,触发notifyApply notice,返回可能出现乱序,更新nextIndex和matchIndex小心不要回退 StartCommand 检查是否为主节点 追加log,修改logIndex,持久化后返回成功 bgApply notifyApply判断是否满足apply条件,astApplied < logIndex && lastApplied < commitIndex,满足发送信道;触发条件,leader commit和follower接收了log 将commit为apply的entry按照顺序发送到applyCh reference 2020-lab2 raft lock advice raft structure advice improved paxos paper raft-draw raft.io
作者:WhateverYoung



lab raft

需要 登录 后方可回复, 如果你还没有账号请 注册新账号