聊聊kingbus的startRaftPeer

/ stream / 没有评论 / 30浏览

本文主要研究一下kingbus的startRaftPeer

startRaftPeer

kingbus/server/server.go

func (s *KingbusServer) startRaftPeer(peerURLs types.URLs) error {
	err := s.raftNode.NewPeerListener(peerURLs)
	if err != nil {
		return err
	}
	s.raftNode.SetPeerHandler()
	s.raftNode.PeerHandlerServe()
	log.Log.Infof("startRaftPeer success")
	return nil
}

NewPeerListener

kingbus/raft/peer_handler.go

//NewPeerListener create listener of peer
func (r *Node) NewPeerListener(peerURLs types.URLs) (err error) {
	peers := make([]*peerListener, len(peerURLs))
	defer func() {
		if err == nil {
			return
		}
		for i := range peers {
			if peers[i] != nil && peers[i].close != nil {
				log.Log.Info("stopping listening for peers on ", peerURLs.String())
				peers[i].close(context.Background())
			}
		}
	}()

	for i, u := range peerURLs {
		peers[i] = &peerListener{close: func(context.Context) error { return nil }}
		peers[i].Listener, err = rafthttp.NewListener(u, nil)
		if err != nil {
			return err
		}
		peers[i].close = func(context.Context) error {
			return peers[i].Listener.Close()
		}
		log.Log.Info("listening for peers on ", u.String())
	}
	r.PeerListener = peers
	return nil
}

PeerHandlerServe

kingbus/raft/peer_handler.go

//PeerHandlerServe serve
func (r *Node) PeerHandlerServe() {
	for _, peer := range r.PeerListener {
		go peer.serve()
	}
}

小结

startRaftPeer方法根据peerURLs通过s.raftNode.NewPeerListener开启listener,之后执行s.raftNode.SetPeerHandler()、s.raftNode.PeerHandlerServe()

doc