聊聊rocketmq-client-go的PullConsumer

/ golangmq / 没有评论 / 20浏览

本文主要研究一下rocketmq-client-go的PullConsumer

PullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type PullConsumer interface {
	// Start
	Start()

	// Shutdown refuse all new pull operation, finish all submitted.
	Shutdown()

	// Pull pull message of topic,  selector indicate which queue to pull.
	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

	// PullFrom pull messages of queue from the offset to offset + numbers
	PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

	// updateOffset update offset of queue in mem
	UpdateOffset(queue *primitive.MessageQueue, offset int64) error

	// PersistOffset persist all offset in mem.
	PersistOffset(ctx context.Context) error

	// CurrentOffset return the current offset of queue in mem.
	CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}

defaultPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type defaultPullConsumer struct {
	*defaultConsumer

	option    consumerOptions
	client    internal.RMQClient
	GroupName string
	Model     MessageModel
	UnitMode  bool

	interceptor primitive.Interceptor
}

NewPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
	defaultOpts := defaultPullConsumerOptions()
	for _, apply := range options {
		apply(&defaultOpts)
	}

	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	if err != nil {
		return nil, errors.Wrap(err, "new Namesrv failed.")
	}

	dc := &defaultConsumer{
		client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
		consumerGroup: defaultOpts.GroupName,
		cType:         _PullConsume,
		state:         int32(internal.StateCreateJust),
		prCh:          make(chan PullRequest, 4),
		model:         defaultOpts.ConsumerModel,
		option:        defaultOpts,

		namesrv: srvs,
	}

	c := &defaultPullConsumer{
		defaultConsumer: dc,
	}
	return c, nil
}

Start

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Start() error {
	atomic.StoreInt32(&c.state, int32(internal.StateRunning))

	var err error
	c.once.Do(func() {
		err = c.start()
		if err != nil {
			return
		}
	})

	return err
}

Pull

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) {
	mq := c.getNextQueueOf(topic)
	if mq == nil {
		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
	}

	data := buildSubscriptionData(mq.Topic, selector)
	result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)

	if err != nil {
		return nil, err
	}

	c.processPullResult(mq, result, data)
	return result, nil
}

getNextQueueOf

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
	queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
	if err != nil && len(queues) > 0 {
		rlog.Error("get next mq error", map[string]interface{}{
			rlog.LogKeyTopic:         topic,
			rlog.LogKeyUnderlayError: err.Error(),
		})
		return nil
	}
	var index int64
	v, exist := queueCounterTable.Load(topic)
	if !exist {
		index = -1
		queueCounterTable.Store(topic, 0)
	} else {
		index = v.(int64)
	}

	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
}

PullFrom

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PullFrom pull messages of queue from the offset to offset + numbers
func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
	if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
		return nil, err
	}

	selector := MessageSelector{}
	data := buildSubscriptionData(queue.Topic, selector)

	return c.pull(ctx, queue, data, offset, numbers)
}

UpdateOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// updateOffset update offset of queue in mem
func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {
	return c.updateOffset(queue, offset)
}

PersistOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PersistOffset persist all offset in mem.
func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
	return c.persistConsumerOffset()
}

CurrentOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// CurrentOffset return the current offset of queue in mem.
func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {
	v := c.queryOffset(queue)
	return v, nil
}

Shutdown

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// Shutdown close defaultConsumer, refuse new request.
func (c *defaultPullConsumer) Shutdown() error {
	return c.defaultConsumer.shutdown()
}

小结

PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset方法

doc