聊聊nacos-sdk-go的BeatReactor

/ distributedgolang / 没有评论 / 10浏览

本文主要研究一下nacos-sdk-go的BeatReactor

BeatReactor

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

type BeatReactor struct {
	beatMap             cache.ConcurrentMap
	serviceProxy        NamingProxy
	clientBeatInterval  int64
	beatThreadCount     int
	beatThreadSemaphore *nsema.Semaphore
	beatRecordMap       cache.ConcurrentMap
}

NewBeatReactor

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func NewBeatReactor(serviceProxy NamingProxy, clientBeatInterval int64) BeatReactor {
	br := BeatReactor{}
	if clientBeatInterval <= 0 {
		clientBeatInterval = 5 * 1000
	}
	br.beatMap = cache.NewConcurrentMap()
	br.serviceProxy = serviceProxy
	br.clientBeatInterval = clientBeatInterval
	br.beatThreadCount = Default_Beat_Thread_Num
	br.beatRecordMap = cache.NewConcurrentMap()
	br.beatThreadSemaphore = nsema.NewSemaphore(br.beatThreadCount)
	return br
}

AddBeatInfo

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) {
	log.Printf("[INFO] adding beat: <%s> to beat map.\n", utils.ToJsonString(beatInfo))
	k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port)
	br.beatMap.Set(k, &beatInfo)
	go br.sendInstanceBeat(k, &beatInfo)
}

RemoveBeatInfo

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) {
	log.Printf("[INFO] remove beat: %s@%s:%d from beat map.\n", serviceName, ip, port)
	k := buildKey(serviceName, ip, port)
	data, exist := br.beatMap.Get(k)
	if exist {
		beatInfo := data.(*model.BeatInfo)
		beatInfo.Stopped = true
	}
	br.beatMap.Remove(k)
}

sendInstanceBeat

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
	for {
		br.beatThreadSemaphore.Acquire()
		//如果当前实例注销,则进行停止心跳
		if beatInfo.Stopped {
			log.Printf("[INFO] intance[%s] stop heartBeating\n", k)
			br.beatThreadSemaphore.Release()
			return
		}

		//进行心跳通信
		beatInterval, err := br.serviceProxy.SendBeat(*beatInfo)
		if err != nil {
			log.Printf("[ERROR]:beat to server return error:%s \n", err.Error())
			br.beatThreadSemaphore.Release()
			t := time.NewTimer(beatInfo.Period)
			<-t.C
			continue
		}
		if beatInterval > 0 {
			beatInfo.Period = time.Duration(time.Millisecond.Nanoseconds() * beatInterval)
		}

		br.beatRecordMap.Set(k, utils.CurrentMillis())
		br.beatThreadSemaphore.Release()

		t := time.NewTimer(beatInfo.Period)
		<-t.C
	}
}

小结

BeatReactor定义了beatMap、serviceProxy、clientBeatInterval、beatThreadCount、beatThreadSemaphore、beatRecordMap属性;它提供了NewBeatReactor、AddBeatInfo、RemoveBeatInfo方法

doc