聊聊kingbus的starRaft

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的starRaft

starRaft

kingbus/server/server.go

func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
	var (
		etcdRaftNode etcdraft.Node
		id           types.ID
		cl           *membership.RaftCluster
		remotes      []*membership.Member
		appliedIndex uint64
	)

	prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
	if err != nil {
		return err
	}

	store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
	if err != nil {
		log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
	}

	//store, err := storage.NewMemoryStorage(cfg.DataDir)
	//if err != nil {
	//	log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
	//}

	defer func() {
		//close storage when occur error
		if err != nil {
			store.Close()
		}
	}()

	logExist := utils.ExistLog(cfg.DataDir)
	switch {
	case !logExist && !cfg.NewCluster:
		if err = cfg.VerifyJoinExisting(); err != nil {
			return err
		}
		cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
		if err != nil {
			return err
		}
		remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
		existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
		if gerr != nil {
			return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
		}
		if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
			return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
		}

		remotes = existingCluster.Members()
		cl.SetID(existingCluster.GetID())
		cl.SetStore(store)
		id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
	case !logExist && cfg.NewCluster:
		if err = cfg.VerifyBootstrap(); err != nil {
			return err
		}
		cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
		if err != nil {
			return err
		}
		m := cl.MemberByName(cfg.Name)
		if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
			return fmt.Errorf("member %s has already been bootstrapped", m.ID)
		}

		cl.SetStore(store)
		id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
	case logExist:
		if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
			return fmt.Errorf("cannot write to member directory: %v", err)
		}
		//node restart, read states from storage
		//get applied index
		appliedIndex = raft.MustGetAppliedIndex(store)
		cfg.AppliedIndex = appliedIndex
		id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
		cl.SetStore(store)
	default:
		return fmt.Errorf("unsupported bootstrap config")
	}

	s.raftNode = raft.NewNode(
		raft.NodeConfig{
			IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
			Node:        etcdRaftNode,
			Heartbeat:   cfg.HeartbeatMs,
			Storage:     store,
		},
	)
	//committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
	//set appliedIndex when applyEntries will check the entry continuity
	s.raftNode.SetAppliedIndex(appliedIndex)

	s.id = id
	s.wait = wait.New()
	s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
	s.stopping = make(chan struct{})
	s.errorc = make(chan error)
	s.applyBroadcast = utils.NewBroadcast()
	s.stats = stats.NewServerStats(cfg.Name, id.String())
	s.lstats = stats.NewLeaderStats(id.String())
	s.store = store

	tr := &rafthttp.Transport{
		TLSInfo:     transport.TLSInfo{},
		DialTimeout: DialTimeout,
		ID:          id,
		URLs:        cfg.PeerURLs,
		ClusterID:   cl.GetID(),
		Raft:        s,
		ServerStats: s.stats,
		LeaderStats: s.lstats,
		ErrorC:      s.errorc,
	}
	if err = tr.Start(); err != nil {
		return err
	}
	// add all remotes into transport
	//Add remotes to rafthttp, who help newly joined members catch up the
	//progress of the cluster. It supports basic message sending to remote, and
	//has no stream connection for simplicity. remotes will not be used
	//after the latest peers have been added into rafthttp.
	for _, m := range remotes {
		if m.ID != id {
			tr.AddRemote(m.ID, m.PeerURLs)
		}
	}
	for _, m := range cl.Members() {
		if m.ID != id {
			tr.AddPeer(m.ID, m.PeerURLs)
		}
	}
	s.raftNode.Transport = tr
	s.cluster = cl

	return nil
}

startEtcdRaftNode

kingbus/server/server.go

func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
	id types.ID, n etcdraft.Node) {
	member := cl.MemberByName(cfg.Name)
	peers := make([]etcdraft.Peer, len(ids))

	for i, id := range ids {
		ctx, err := json.Marshal((*cl).Member(id))
		if err != nil {
			log.Log.Panicf("marshal member should never fail: %v", err)
		}
		peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
	}
	id = member.ID
	log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())

	c := &etcdraft.Config{
		ID:                        uint64(id),
		ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
		HeartbeatTick:             1,
		Storage:                   store,
		MaxSizePerMsg:             cfg.MaxRequestBytes,
		MaxInflightMsgs:           maxInflightMsgs,
		CheckQuorum:               true,
		PreVote:                   cfg.PreVote,
		DisableProposalForwarding: true,
		Logger:                    log.Log,
	}

	n = etcdraft.StartNode(c, peers)
	raft.AdvanceTicks(n, c.ElectionTick)
	return id, n
}

restartEtcdNode

kingbus/server/server.go

func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
	types.ID, etcdraft.Node, *membership.RaftCluster) {
	cl, err := membership.GetRaftClusterFromStorage(store)
	if err != nil {
		if err != nil {
			log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
		}
	}

	log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())

	//get id from raftCluster
	member := cl.MemberByName(cfg.Name)
	if member == nil {
		log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
			cl.String(), cfg.Name)
	}
	c := &etcdraft.Config{
		ID:                        uint64(member.ID),
		ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
		HeartbeatTick:             1,
		Applied:                   cfg.AppliedIndex, //set appliedIndex
		Storage:                   store,
		MaxSizePerMsg:             cfg.MaxRequestBytes,
		MaxInflightMsgs:           maxInflightMsgs,
		CheckQuorum:               true,
		PreVote:                   cfg.PreVote,
		DisableProposalForwarding: true,
		Logger:                    log.Log,
	}

	n := etcdraft.RestartNode(c)
	return member.ID, n, cl
}

小结

starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

doc