聊聊kingbus的binlog_syncer_handler.go

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的binlog_syncer_handler.go

StartBinlogSyncer

kingbus/api/binlog_syncer_handler.go

//StartBinlogSyncer implements start a binlog syncer
func (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()

	var args config.SyncerArgs
	var err error
	var syncerID int

	defer func() {
		if err != nil {
			log.Log.Errorf("StartBinlogSyncer error,err: %s", err)
			echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
	}()

	err = echoCtx.Bind(&args)
	if err != nil {
		return err
	}
	//check args
	err = args.Check()
	if err != nil {
		return err
	}

	//forward to leader
	if h.svr.IsLeader() == false {
		req, err := json.Marshal(args)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req)
		if err != nil {
			log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args)
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		if resp.Message != "success" {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	//start syncer server
	err = h.svr.StartServer(config.SyncerServerType, &args)
	if err != nil {
		log.Log.Errorf("StartServer error,err:%s,args:%v", err, args)
		return err
	}

	//propose start syncer info
	err = h.ProposeSyncerArgs(&args)
	if err != nil {
		log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args)
		return err
	}

	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID))
}

StopBinlogSyncer

kingbus/api/binlog_syncer_handler.go

//StopBinlogSyncer implements stop binlog syncer
func (h *BinlogSyncerHandler) StopBinlogSyncer(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()

	//forward to leader
	if h.svr.IsLeader() == false {
		resp, err := h.sendToLeader("PUT", "/binlog/syncer/stop", nil)
		if err != nil {
			log.Log.Errorf("sendToLeader error,err:%s", err)
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		if resp.Message != "success" {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	h.svr.StopServer(config.SyncerServerType)
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}

GetBinlogSyncerStatus

kingbus/api/binlog_syncer_handler.go

//GetBinlogSyncerStatus implements get binlog syncer status in the runtime state
func (h *BinlogSyncerHandler) GetBinlogSyncerStatus(echoCtx echo.Context) error {
	h.l.Lock()
	defer h.l.Unlock()

	//forward to leader
	if h.svr.IsLeader() == false {
		resp, err := h.sendToLeader("GET", "/binlog/syncer/status", nil)
		if err != nil {
			log.Log.Errorf("sendToLeader error,err:%s", err)
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		if resp.Message != "success" {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	status := h.svr.GetServerStatus(config.SyncerServerType)
	if syncerStatus, ok := status.(*config.SyncerStatus); ok {
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerStatus))
	}
	return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}

sendToLeader

kingbus/api/membership_handler.go

func (h *MembershipHandler) sendToLeader(method string, req []byte) (*utils.Resp, error) {
	leaderID := h.svr.Leader()
	leader := h.cluster.Member(leaderID)
	if leader == nil {
		return nil, ErrNoLeader
	}
	if len(leader.AdminURLs) != 1 {
		log.Log.Errorf("leader admin url is not 1,leader:%v", *leader)
		return nil, ErrNoLeader
	}
	leaderURL, err := url.Parse(leader.AdminURLs[0])
	if err != nil {
		return nil, err
	}
	url := leaderURL.Scheme + "://" + leaderURL.Host + "/members"

	resp, err := utils.SendRequest(method, url, req)
	if err != nil {
		log.Log.Errorf("sendToLeader:SendRequest error,err:%s,url:%s", err, url)
		return nil, err
	}

	return resp, nil
}

SendRequest

kingbus/utils/http_utils.go

//SendRequest send PUT request to leader
func SendRequest(method string, leaderURL string, data []byte) (*Resp, error) {
	client := &http.Client{}
	req, err := http.NewRequest(method, leaderURL, bytes.NewBuffer(data))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json;charset=utf-8")

	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	respBody, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}

	r := new(Resp)
	err = json.Unmarshal(respBody, r)
	if err != nil {
		return nil, err
	}

	return r, nil
}

小结

kingbus的binlog_syncer_handler.go提供了StartBinlogSyncer、StopBinlogSyncer、GetBinlogSyncerStatus方法

doc