etcd 中 raft 算法的使用方法

Oprah ·
更新时间:2024-09-20
· 573 次阅读

raft 协议是一个一致性算法,解决多台机器之间数据一致的问题。raft 声称简洁明了,可以取代非常复杂的 PAXOS 算法。然而翻看 raft 的论文后,会发现即便声称简洁明了,自己完整地实现 raft 还是很麻烦的。

etcd是一个分布式的 key-value 存储组件,它通过 raft 算法保证多台机器数据的一致性。那么 etcd 中的 raft 算法可以提取出来用在自己的项目中吗?

答案是可以的。etcd 不仅实现了 raft,还把 raft 解耦得很完美,完全可以独立使用。代码库点这儿:https://github.com/etcd-io/etcd/tree/master/raft。

美中不足的是,etcd raft的使用文档写得很烂,文档中列的代码缺了很多关键部分,是跑不起来的。按照文档中的代码写,不是报错就是 go panic,要不就是跑起来后机器都僵着不选举。经过笔者的实践,补齐了缺失的代码,完成了一个可以跑起来的示例,代码见文章最后。

实践过程中,使用文档中没有提及的几个点:

文档说 n := raft.StartNode() 就可以启动一个节点,实际这样做会 panic,要自己额外再封装一个 struct ,并且实现 Process() 方法才行(见本文 raft.go里的 rNode

文档说集群中在收到对方节点的 RPC 消息时,要调用 n.Step() 方法:

func recvRaftRPC(ctx context.Context, m raftpb.Message) { n.Step(ctx, m) }

但这个recvRaftRPC() 又在哪调用呢?回顾第 1 条不是要自己封装一个 struct 吗,n.Step() 应该写在这个 struct 的 Process() 方法里,而不是放在什么 recvRaftRPC() 里(见本文 raft.go 里的 rNode)。raft 算法会在接收到其他节点的RPC请求时调用 Process()

还是 raft.StartNode() ,文档的这段代码: n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

意思是三个节点的集群,如果当前启动节点 ID 是 0x01,那么启动时 peer 列表只传 0x02, 0x03,不传自己,实际这样做启动集群后会僵住不选举。正确做法是把节点自己也传入 peer 列表。

文档中的 for-select 循环,是要写在一个 go 协程里的。不然启动后集群会僵住不选举。 示例代码介绍

本文的示例代码是一个三节点的集群,节点之前通过 http 交换 raft 报文。

集群启动之后,0x01节点会每隔 1 秒申请提案(也就是业务数据):

for { log.Printf("Propose on node %v\n", *id) n.node.Propose(context.TODO(), []byte("hello")) time.Sleep(time.Second) }

然后在代码的 这个地方:

for _, entry := range rd.CommittedEntries { switch entry.Type { case raftpb.EntryNormal: log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data)) .... }

集群的每个节点都会收到这个提案,这时后提案在集群里是一致的了,可以放心地持久化了。

完整代码:

main.go

package main import ( "context" "flag" "log" "time" ) func main() { id := flag.Uint64("id", 1, "node id") flag.Parse() log.Printf("I'am node %v\n", *id) cluster := map[uint64]string{ 1: "http://127.0.0.1:22210", 2: "http://127.0.0.1:22220", 3: "http://127.0.0.1:22230", } n := newRaftNode(*id, cluster) if *id == 1 { time.Sleep(5 * time.Second) for { log.Printf("Propose on node %v\n", *id) n.node.Propose(context.TODO(), []byte("hello")) time.Sleep(time.Second) } } select {} }

raft.go

package main import ( "context" "log" "net/http" "strconv" "strings" "time" "go.etcd.io/etcd/etcdserver/api/rafthttp" stats "go.etcd.io/etcd/etcdserver/api/v2stats" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" "go.uber.org/zap" ) type rNode struct { id uint64 peerMap map[uint64]string node raft.Node raftStorage *raft.MemoryStorage transport *rafthttp.Transport } func newRaftNode(id uint64, peerMap map[uint64]string) *rNode { n := &rNode{ id: id, peerMap: peerMap, raftStorage: raft.NewMemoryStorage(), } go n.startRaft() return n } func (rn *rNode) startRaft() { peers := []raft.Peer{} for i := range rn.peerMap { peers = append(peers, raft.Peer{ID: uint64(i)}) } c := &raft.Config{ ID: rn.id, ElectionTick: 10, HeartbeatTick: 1, Storage: rn.raftStorage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } rn.node = raft.StartNode(c, peers) rn.transport = &rafthttp.Transport{ Logger: zap.NewExample(), ID: types.ID(rn.id), ClusterID: 0x1000, Raft: rn, ServerStats: stats.NewServerStats("", ""), LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(rn.id))), ErrorC: make(chan error), } rn.transport.Start() for peer, addr := range rn.peerMap { if peer != rn.id { rn.transport.AddPeer(types.ID(peer), []string{addr}) } } go rn.serveRaft() go rn.serveChannels() } func (rn *rNode) serveRaft() { addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):] server := http.Server{ Addr: addr, Handler: rn.transport.Handler(), } server.ListenAndServe() } func (rn *rNode) serveChannels() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: rn.node.Tick() case rd := <-rn.node.Ready(): rn.raftStorage.Append(rd.Entries) rn.transport.Send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { rn.raftStorage.ApplySnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { switch entry.Type { case raftpb.EntryNormal: log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data)) case raftpb.EntryConfChange: var cc raftpb.ConfChange cc.Unmarshal(entry.Data) rn.node.ApplyConfChange(cc) } } rn.node.Advance() case err := <-rn.transport.ErrorC: log.Fatal(err) } } } func (rn *rNode) Process(ctx context.Context, m raftpb.Message) error { return rn.node.Step(ctx, m) } func (rn *rNode) IsIDRemoved(id uint64) bool { return false } func (rn *rNode) ReportUnreachable(id uint64) {} func (rn *rNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
作者:切糕糕



raft etcd 方法 算法

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章