聊聊kingbus的membership_handler.go

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的membership_handler.go

GetMembers

kingbus/api/membership_handler.go

//GetMembers implements get information of membership, not include lead information
func (h *MembershipHandler) GetMembers(echoCtx echo.Context) error {
	members := h.cluster.Members()
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}

AddMember

kingbus/api/membership_handler.go

//AddMember implements add a member into raft cluster
func (h *MembershipHandler) AddMember(echoCtx echo.Context) error {
	args := struct {
		NodeName string `json:"name"`
		PeerURL  string `json:"peer_url"`
		AdminURL string `json:"admin_url"`
	}{}
	err := echoCtx.Bind(&args)
	if err != nil {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}

	isLeader := h.svr.IsLeader()
	if isLeader == false {
		req, err := json.Marshal(args)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		resp, err := h.sendToLeader("POST", req)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
	defer cancel()

	peerURLs, err := types.NewURLs([]string{args.PeerURL})
	if err != nil {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}
	adminURLs, err := types.NewURLs([]string{args.AdminURL})
	if err != nil {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}
	now := time.Now()

	member := membership.NewMember(args.NodeName, peerURLs, adminURLs, &(now))
	members, err := h.svr.AddMember(ctx, *member)
	switch {
	case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:
		return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))
	case err != nil:
		log.Log.Errorf("error adding member %s (%v)", member.ID, err)
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}

	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}

UpdateMember

kingbus/api/membership_handler.go

//UpdateMember implements update member information
func (h *MembershipHandler) UpdateMember(echoCtx echo.Context) error {
	args := struct {
		NodeName   string `json:"name"`
		NewPeerURL string `json:"new_peer_url"`
	}{}
	err := echoCtx.Bind(&args)
	if err != nil {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}

	isLeader := h.svr.IsLeader()
	if isLeader == false {
		req, err := json.Marshal(args)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		resp, err := h.sendToLeader("PUT", req)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
	defer cancel()

	m := h.cluster.MemberByName(args.NodeName)
	if m == nil {
		err = fmt.Errorf("no such member: %s", args.NodeName)
		return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))
	}

	newMember := membership.Member{
		ID:             m.ID,
		RaftAttributes: membership.RaftAttributes{PeerURLs: []string{args.NewPeerURL}},
	}
	members, err := h.svr.UpdateMember(ctx, newMember)
	switch {
	case err == membership.ErrPeerURLexists:
		return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))
	case err == membership.ErrIDNotFound:
		err = fmt.Errorf("no such member: %s", args.NodeName)
		return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))
	case err != nil:
		log.Log.Errorf("error updating member %s (%v)", m.ID, err)
		echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	default:
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
	}
	return nil
}

DeleteMember

kingbus/api/membership_handler.go

//DeleteMember implements remove a member from raft cluster
func (h *MembershipHandler) DeleteMember(echoCtx echo.Context) error {
	args := struct {
		NodeName string `json:"name"`
		PeerURL  string `json:"peer_url"`
	}{}
	err := echoCtx.Bind(&args)
	if err != nil {
		return echoCtx.JSON(http.StatusForbidden, err.Error())
	}

	isLeader := h.svr.IsLeader()
	if isLeader == false {
		req, err := json.Marshal(args)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		resp, err := h.sendToLeader("DELETE", req)
		if err != nil {
			return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
		}
		return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
	}

	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
	defer cancel()

	m := h.cluster.MemberByName(args.NodeName)
	if m == nil {
		msg := fmt.Sprintf("No such member: %s", args.NodeName)
		return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))
	}

	log.Log.Debugf("DeleteMember:remove member id is %s", m.ID.String())

	members, err := h.svr.RemoveMember(ctx, uint64(m.ID))
	switch {
	case err == membership.ErrIDRemoved:
		msg := fmt.Sprintf("Member permanently removed: %s", args.NodeName)
		return echoCtx.JSON(http.StatusGone, utils.NewResp().SetError(msg))
	case err == membership.ErrIDNotFound:
		msg := fmt.Sprintf("No such member: %s", args.NodeName)
		return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))
	case err != nil:
		log.Log.Errorf("error removing member %s (%v)", args.NodeName, err)
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}

GetCluster

kingbus/api/membership_handler.go

//GetCluster implements get information of raft cluster
func (h *MembershipHandler) GetCluster(echoCtx echo.Context) error {
	members := h.cluster.Members()
	roles := make([]*Role, 0, len(members))
	for _, m := range members {
		r := new(Role)
		r.ID = fmt.Sprintf("%x", uint64(m.ID))
		r.RaftAttributes = m.RaftAttributes
		r.Attributes = m.Attributes
		if uint64(m.ID) == uint64(h.svr.Leader()) {
			r.IsLeader = true
		} else {
			r.IsLeader = false
		}
		roles = append(roles, r)
	}
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(roles))
}

UpdateAdminURL

kingbus/api/membership_handler.go

//UpdateAdminURL implements update raft node admin url in raft cluster
func (h *MembershipHandler) UpdateAdminURL(echoCtx echo.Context) error {
	var attributes config.Attributes
	if h.svr.IsLeader() == false {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(ErrNotLeader.Error()))
	}
	err := echoCtx.Bind(&attributes)
	if err != nil {
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}
	data, err := attributes.EncodeWithType()
	if err != nil {
		log.Log.Errorf("attributes EncodeWithType error,err:%v,attributes:%v", err, attributes)
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}

	err = h.svr.Propose(data)
	if err != nil {
		log.Log.Errorf("Propose attributes error,err:%s,attributes:%v", err, attributes)
		return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
	}
	return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}

小结

membership_handler.go提供了GetMembers、AddMember、UpdateMember、DeleteMember、GetCluster、UpdateAdminURL方法

doc