聊聊rocketmq-client-go的localFileOffsetStore

/ mq / 没有评论 / 20浏览

本文主要研究一下rocketmq-client-go的localFileOffsetStore

OffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type OffsetStore interface {
	persist(mqs []*primitive.MessageQueue)
	remove(mq *primitive.MessageQueue)
	read(mq *primitive.MessageQueue, t readType) int64
	update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
}

localFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type localFileOffsetStore struct {
	group       string
	path        string
	OffsetTable map[MessageQueueKey]int64
	// mutex for offset file
	mutex sync.Mutex
}

NewLocalFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
	store := &localFileOffsetStore{
		group:       group,
		path:        filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),
		OffsetTable: make(map[MessageQueueKey]int64),
	}
	store.load()
	return store
}

load

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) load() {
	local.mutex.Lock()
	defer local.mutex.Unlock()
	data, err := utils.FileReadAll(local.path)
	if os.IsNotExist(err) {
		return
	}
	if err != nil {
		rlog.Info("read from local store error, try to use bak file", map[string]interface{}{
			rlog.LogKeyUnderlayError: err,
		})
		data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
	}
	if err != nil {
		rlog.Info("read from local store bak file error", map[string]interface{}{
			rlog.LogKeyUnderlayError: err,
		})
		return
	}
	datas := make(map[MessageQueueKey]int64)

	wrapper := OffsetSerializeWrapper{
		OffsetTable: datas,
	}

	err = jsoniter.Unmarshal(data, &wrapper)
	if err != nil {
		rlog.Warning("unmarshal local offset error", map[string]interface{}{
			"local_path":             local.path,
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return
	}

	if datas != nil {
		local.OffsetTable = datas
	}
}

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
	switch t {
	case _ReadFromMemory, _ReadMemoryThenStore:
		off := readFromMemory(local.OffsetTable, mq)
		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
			return off
		}
		fallthrough
	case _ReadFromStore:
		local.load()
		return readFromMemory(local.OffsetTable, mq)
	default:

	}
	return -1
}

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
	local.mutex.Lock()
	defer local.mutex.Unlock()
	rlog.Debug("update offset", map[string]interface{}{
		rlog.LogKeyMessageQueue: mq,
		"new_offset":            offset,
	})
	key := MessageQueueKey(*mq)
	localOffset, exist := local.OffsetTable[key]
	if !exist {
		local.OffsetTable[key] = offset
		return
	}
	if increaseOnly {
		if localOffset < offset {
			local.OffsetTable[key] = offset
		}
	} else {
		local.OffsetTable[key] = offset
	}
}

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
	if len(mqs) == 0 {
		return
	}
	local.mutex.Lock()
	defer local.mutex.Unlock()

	wrapper := OffsetSerializeWrapper{
		OffsetTable: local.OffsetTable,
	}

	data, _ := jsoniter.Marshal(wrapper)
	utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
}

小结

OffsetStore定义了persist、remove、read、update方法;localFileOffsetStore定义了group、path、OffsetTable、mutex属性

doc