聊聊rocketmq-client-go的strategy

/ golangmq / 没有评论 / 20浏览

本文主要研究一下rocketmq-client-go的strategy

AllocateStrategy

rocketmq-client-go-v2.0.0/consumer/strategy.go

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue

AllocateByAveragely

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue {
	if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
		return nil
	}

	var (
		find  bool
		index int
	)
	for idx := range cidAll {
		if cidAll[idx] == currentCID {
			find = true
			index = idx
			break
		}
	}
	if !find {
		rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
			rlog.LogKeyConsumerGroup: consumerGroup,
			"consumerId":             currentCID,
			"cidAll":                 cidAll,
		})
		return nil
	}

	mqSize := len(mqAll)
	cidSize := len(cidAll)
	mod := mqSize % cidSize

	var averageSize int
	if mqSize <= cidSize {
		averageSize = 1
	} else {
		if mod > 0 && index < mod {
			averageSize = mqSize/cidSize + 1
		} else {
			averageSize = mqSize / cidSize
		}
	}

	var startIndex int
	if mod > 0 && index < mod {
		startIndex = index * averageSize
	} else {
		startIndex = index*averageSize + mod
	}

	num := utils.MinInt(averageSize, mqSize-startIndex)
	result := make([]*primitive.MessageQueue, 0)
	for i := 0; i < num; i++ {
		result = append(result, mqAll[(startIndex+i)%mqSize])
	}
	return result
}

AllocateByAveragelyCircle

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue {
	if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
		return nil
	}

	var (
		find  bool
		index int
	)
	for idx := range cidAll {
		if cidAll[idx] == currentCID {
			find = true
			index = idx
			break
		}
	}
	if !find {
		rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
			rlog.LogKeyConsumerGroup: consumerGroup,
			"consumerId":             currentCID,
			"cidAll":                 cidAll,
		})
		return nil
	}

	result := make([]*primitive.MessageQueue, 0)
	for i := index; i < len(mqAll); i++ {
		if i%len(cidAll) == index {
			result = append(result, mqAll[i])
		}
	}
	return result
}

AllocateByConfig

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {
	return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
		return list
	}
}

AllocateByMachineRoom

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {
	return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
		if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
			return nil
		}

		var (
			find  bool
			index int
		)
		for idx := range cidAll {
			if cidAll[idx] == currentCID {
				find = true
				index = idx
				break
			}
		}
		if !find {
			rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
				rlog.LogKeyConsumerGroup: consumerGroup,
				"consumerId":             currentCID,
				"cidAll":                 cidAll,
			})
			return nil
		}

		var premqAll []*primitive.MessageQueue
		for _, mq := range mqAll {
			temp := strings.Split(mq.BrokerName, "@")
			if len(temp) == 2 {
				for _, idc := range consumeridcs {
					if idc == temp[0] {
						premqAll = append(premqAll, mq)
					}
				}
			}
		}

		mod := len(premqAll) / len(cidAll)
		rem := len(premqAll) % len(cidAll)
		startIndex := mod * index
		endIndex := startIndex + mod

		result := make([]*primitive.MessageQueue, 0)
		for i := startIndex; i < endIndex; i++ {
			result = append(result, mqAll[i])
		}
		if rem > index {
			result = append(result, premqAll[index+mod*len(cidAll)])
		}
		return result
	}
}

AllocateByConsistentHash

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {
	return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
		if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
			return nil
		}

		var (
			find bool
		)
		for idx := range cidAll {
			if cidAll[idx] == currentCID {
				find = true
				break
			}
		}
		if !find {
			rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
				rlog.LogKeyConsumerGroup: consumerGroup,
				"consumerId":             currentCID,
				"cidAll":                 cidAll,
			})
			return nil
		}

		c := consistent.New()
		c.NumberOfReplicas = virtualNodeCnt
		for _, cid := range cidAll {
			c.Add(cid)
		}

		result := make([]*primitive.MessageQueue, 0)
		for _, mq := range mqAll {
			clientNode, err := c.Get(mq.String())
			if err != nil {
				rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
				})
			}
			if currentCID == clientNode {
				result = append(result, mq)
			}
		}
		return result
	}
}

小结

AllocateStrategy定义了一个func;strategy.go提供了AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash等方法

doc