聊聊rocketmq-client-go的remoteBrokerOffsetStore

/ golangmq / 没有评论 / 110浏览

本文主要研究一下rocketmq-client-go的remoteBrokerOffsetStore

remoteBrokerOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type remoteBrokerOffsetStore struct {
	group       string
	OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
	client      internal.RMQClient
	namesrv     internal.Namesrvs
	mutex       sync.RWMutex
}

NewRemoteOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {
	return &remoteBrokerOffsetStore{
		group:       group,
		client:      client,
		namesrv:     namesrv,
		OffsetTable: make(map[primitive.MessageQueue]int64),
	}
}

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	if len(mqs) == 0 {
		return
	}

	used := make(map[primitive.MessageQueue]struct{}, 0)
	for _, mq := range mqs {
		used[*mq] = struct{}{}
	}

	for mq, off := range r.OffsetTable {
		if _, ok := used[mq]; !ok {
			delete(r.OffsetTable, mq)
			continue
		}
		err := r.updateConsumeOffsetToBroker(r.group, mq, off)
		if err != nil {
			rlog.Warning("update offset to broker error", map[string]interface{}{
				rlog.LogKeyConsumerGroup: r.group,
				rlog.LogKeyMessageQueue:  mq.String(),
				rlog.LogKeyUnderlayError: err.Error(),
				"offset":                 off,
			})
		} else {
			rlog.Info("update offset to broker success", map[string]interface{}{
				rlog.LogKeyConsumerGroup: r.group,
				rlog.LogKeyMessageQueue:  mq.String(),
				"offset":                 off,
			})
		}
	}
}

remove

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	delete(r.OffsetTable, *mq)
	rlog.Info("delete mq from offset table", map[string]interface{}{
		rlog.LogKeyMessageQueue: mq,
	})
}

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
	r.mutex.RLock()
	switch t {
	case _ReadFromMemory, _ReadMemoryThenStore:
		off, exist := r.OffsetTable[*mq]
		if exist {
			r.mutex.RUnlock()
			return off
		}
		if t == _ReadFromMemory {
			r.mutex.RUnlock()
			return -1
		}
		fallthrough
	case _ReadFromStore:
		off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
		if err != nil {
			rlog.Error("fecth offset of mq error", map[string]interface{}{
				rlog.LogKeyMessageQueue:  mq.String(),
				rlog.LogKeyUnderlayError: err,
			})
			r.mutex.RUnlock()
			return -1
		}
		r.mutex.RUnlock()
		r.update(mq, off, true)
		return off
	default:
	}

	return -1
}

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	localOffset, exist := r.OffsetTable[*mq]
	if !exist {
		r.OffsetTable[*mq] = offset
		return
	}
	if increaseOnly {
		if localOffset < offset {
			r.OffsetTable[*mq] = offset
		}
	} else {
		r.OffsetTable[*mq] = offset
	}
}

fetchConsumeOffsetFromBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
	broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	if broker == "" {
		r.namesrv.UpdateTopicRouteInfo(mq.Topic)
		broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	}
	if broker == "" {
		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
	}
	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
		ConsumerGroup: group,
		Topic:         mq.Topic,
		QueueId:       mq.QueueId,
	}
	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
	res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
	if err != nil {
		return -1, err
	}
	if res.Code != internal.ResSuccess {
		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
	}

	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)

	if err != nil {
		return -1, err
	}

	return off, nil
}

updateConsumeOffsetToBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {
	broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	if broker == "" {
		r.namesrv.UpdateTopicRouteInfo(mq.Topic)
		broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	}
	if broker == "" {
		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
	}

	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
		ConsumerGroup: group,
		Topic:         mq.Topic,
		QueueId:       mq.QueueId,
		CommitOffset:  off,
	}
	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
	return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}

小结

remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker方法

doc