聊聊kingbus的startAdminServer

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的startAdminServer

startAdminServer

kingbus/server/server.go

func (s *KingbusServer) startAdminServer(urls types.URLs) error {
	if len(urls) != 1 {
		return ErrArgs
	}
	addr := urls[0].Host
	s.adminSvr = api.NewAdminServer(addr, s, s.cluster)
	return nil
}

NewAdminServer

kingbus/api/api_server.go

//AdminServer is a server for handling api call
type AdminServer struct {
	AdminAddr string
	web       *echo.Echo
	mh        *MembershipHandler
	bs        *BinlogSyncerHandler
	bm        *BinlogServerHandler
}

//NewAdminServer creates a admin server
func NewAdminServer(addr string, svr Server, cluster Cluster) *AdminServer {
	adminServer := new(AdminServer)

	adminServer.AdminAddr = addr
	adminServer.web = echo.New()
	adminServer.web.HideBanner = true
	adminServer.web.HidePort = true

	adminServer.mh = &MembershipHandler{
		svr:     svr,
		cluster: cluster,
		timeout: adminAPITimeout,
	}
	adminServer.bs = &BinlogSyncerHandler{
		svr:     svr,
		cluster: cluster,
	}
	adminServer.bm = &BinlogServerHandler{
		svr: svr,
	}
	return adminServer
}

Run

kingbus/api/api_server.go

func (s *AdminServer) Run() {
	s.RegisterMiddleware()
	s.RegisterURL()
	err := s.web.Start(s.AdminAddr)
	if err != nil {
		log.Log.Infof("admin server start error,err:%s", err)
	}
}

RegisterMiddleware

kingbus/api/api_server.go

//RegisterMiddleware implements register middleware in web
func (s *AdminServer) RegisterMiddleware() {
	loggerConfig := mw.LoggerConfig{
		Skipper: mw.DefaultSkipper,
		Format: `{"time":"${time_rfc3339_nano}","id":"${id}","remote_ip":"${remote_ip}","host":"${host}",` +
			`"method":"${method}","uri":"${uri}","status":${status}, "latency":${latency},` +
			`"latency_human":"${latency_human}","bytes_in":${bytes_in},` +
			`"bytes_out":${bytes_out}}` + "\n",
		CustomTimeFormat: "2006-01-02 15:04:05.00000",
		Output:           log.NewWriter(),
	}
	s.web.Use(mw.LoggerWithConfig(loggerConfig))
	s.web.Use(mw.Recover())
}

RegisterURL

kingbus/api/api_server.go

//RegisterURL implements url binding
func (s *AdminServer) RegisterURL() {
	//member handler
	s.web.GET("/members", s.mh.GetMembers)
	s.web.POST("/members", s.mh.AddMember)
	s.web.PUT("/members", s.mh.UpdateMember)
	s.web.DELETE("/members", s.mh.DeleteMember)
	s.web.GET("/cluster", s.mh.GetCluster)
	s.web.PUT("/admin/url", s.mh.UpdateAdminURL)

	//binlog syncer handler
	s.web.PUT("/binlog/syncer/start", s.bs.StartBinlogSyncer)
	s.web.PUT("/binlog/syncer/stop", s.bs.StopBinlogSyncer)
	s.web.GET("/binlog/syncer/status", s.bs.GetBinlogSyncerStatus)

	//binlog server handler
	s.web.PUT("/binlog/server/start", s.bm.StartBinlogServer)
	s.web.PUT("/binlog/server/stop", s.bm.StopBinlogServer)
	s.web.GET("/binlog/server/status", s.bm.GetBinlogServerStatus)
}

小结

startAdminServer方法主要是执行api.NewAdminServer;NewAdminServer创建adminServer,并设置了MembershipHandler、BinlogSyncerHandler、BinlogServerHandler

doc