聊聊kingbus的binlog_server_handler.go

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的binlog_server_handler.go

StartBinlogServer

kingbus/api/binlog_server_handler.go

//StartBinlogServer implements start a binlog server
func (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()

	var args config.BinlogServerConfig
	var err error

	defer func() {
		if err != nil {
			log.Log.Errorf("StartBinlogServer error,err: %s", err)
			echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
	}()

	err = echoCtx.Bind(&args)
	if err != nil {
		return err
	}
	kingbusIP := h.svr.GetIP()
	//check args
	err = args.Check(kingbusIP)
	if err != nil {
		return err
	}
	//start syncer server
	err = h.svr.StartServer(config.BinlogServerType, &args)
	if err != nil {
		log.Log.Errorf("start server error,err:%s,args:%v", err, args)
		return err
	}

	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}

StartServer

kingbus/server/server.go

//StartServer start sub servers:syncer server or binlog master server
func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error {
	var err error
	switch svrType {
	case config.SyncerServerType:
		if s.IsSyncerStarted() {
			return ErrStarted
		}
		syncerArgs, ok := args.(*config.SyncerArgs)
		if !ok {
			log.Log.Errorf("StartServer args is illegal,args:%v", args)
			return ErrArgs
		}
		err = s.startSyncerServer(syncerArgs)
		if err != nil {
			log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs)
			return ErrArgs
		}
		//start to propose binlog event to raft cluster
		s.StartProposeBinlog(s.syncer.ctx)
		log.Log.Debugf("start syncer,and propose!!!")
		return nil
	case config.BinlogServerType:
		if s.IsBinlogServerStarted() {
			return ErrStarted
		}
		masterArgs, ok := args.(*config.BinlogServerConfig)
		if !ok {
			log.Log.Errorf("StartServer args is illegal,args:%v", args)
			return ErrArgs
		}
		err = s.startMasterServer(masterArgs)
		if err != nil {
			log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs)
			return ErrArgs
		}
		return nil
	default:
		log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType)
	}
	return nil
}

StopBinlogServer

kingbus/api/binlog_server_handler.go

//StopBinlogServer implements stop binlog server
func (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()
	h.svr.StopServer(config.BinlogServerType)
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}

StopServer

kingbus/server/server.go

//StopServer stop sub server
func (s *KingbusServer) StopServer(svrType config.SubServerType) {
	switch svrType {
	case config.SyncerServerType:
		if s.IsSyncerStarted() {
			s.syncer.Stop()
		}
	case config.BinlogServerType:
		if s.IsBinlogServerStarted() {
			s.master.Stop()
		}
	default:
		log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
	}
}

GetBinlogServerStatus

kingbus/api/binlog_server_handler.go

//GetBinlogServerStatus implements get binlog server status in the runtime state
func (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()

	status := h.svr.GetServerStatus(config.BinlogServerType)
	if masterStatus, ok := status.(*config.BinlogServerStatus); ok {
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus))
	}
	return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}

GetServerStatus

//GetServerStatus get the sub server status
func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} {
	switch svrType {
	case config.SyncerServerType:
		var syncerStatus config.SyncerStatus
		if s.IsSyncerStarted() {
			cfg := s.syncer.cfg
			syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
			syncerStatus.MysqlUser = cfg.User
			syncerStatus.MysqlPassword = cfg.Password
			syncerStatus.SemiSync = cfg.SemiSyncEnabled

			syncerStatus.Status = config.ServerRunningStatus
			syncerStatus.CurrentGtid = s.CurrentGtidStr()
			syncerStatus.LastBinlogFile = s.LastBinlogFile()
			syncerStatus.LastFilePosition = s.LastFilePosition()
			syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr()

			purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
			if err != nil {
				log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
			}
			syncerStatus.PurgedGtidSet = purgedGtids.String()
		} else {
			syncerStatus.Status = config.ServerStoppedStatus
		}
		return &syncerStatus
	case config.BinlogServerType:
		var status config.BinlogServerStatus
		if s.IsBinlogServerStarted() {
			cfg := s.master.cfg
			status.Addr = cfg.Addr
			status.User = cfg.User
			status.Password = cfg.Password

			status.Slaves = make([]*mysql.Slave, 0, 2)
			slaves := s.master.GetSlaves()
			for _, s := range slaves {
				status.Slaves = append(status.Slaves, s)
			}
			status.CurrentGtid = s.CurrentGtidStr()
			status.LastBinlogFile = s.LastBinlogFile()
			status.LastFilePosition = s.LastFilePosition()
			status.ExecutedGtidSet = s.ExecutedGtidSetStr()

			purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
			if err != nil {
				log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
			}
			status.PurgedGtidSet = purgedGtids.String()
			status.Status = config.ServerRunningStatus
		} else {
			status.Status = config.ServerStoppedStatus
		}
		return &status
	default:
		log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
	}
	return nil
}

小结

kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法

doc