聊聊nacos-sdk-go的HostReactor

/ 默认分类 / 没有评论 / 10浏览

本文主要研究一下nacos-sdk-go的HostReactor

HostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

type HostReactor struct {
	serviceInfoMap       cache.ConcurrentMap
	cacheDir             string
	updateThreadNum      int
	serviceProxy         NamingProxy
	pushReceiver         PushReceiver
	subCallback          SubscribeCallback
	updateTimeMap        cache.ConcurrentMap
	updateCacheWhenEmpty bool
}

NewHostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
	if updateThreadNum <= 0 {
		updateThreadNum = Default_Update_Thread_Num
	}
	hr := HostReactor{
		serviceProxy:         serviceProxy,
		cacheDir:             cacheDir,
		updateThreadNum:      updateThreadNum,
		serviceInfoMap:       cache.NewConcurrentMap(),
		subCallback:          subCallback,
		updateTimeMap:        cache.NewConcurrentMap(),
		updateCacheWhenEmpty: updateCacheWhenEmpty,
	}
	pr := NewPushRecevier(&hr)
	hr.pushReceiver = *pr
	if !notLoadCacheAtStart {
		hr.loadCacheFromDisk()
	}
	go hr.asyncUpdateService()
	return hr
}

loadCacheFromDisk

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) loadCacheFromDisk() {
	serviceMap := cache.ReadServicesFromFile(hr.cacheDir)
	if serviceMap == nil || len(serviceMap) == 0 {
		return
	}
	for k, v := range serviceMap {
		hr.serviceInfoMap.Set(k, v)
	}
}

asyncUpdateService

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) asyncUpdateService() {
	sema := utils.NewSemaphore(hr.updateThreadNum)
	for {
		for _, v := range hr.serviceInfoMap.Items() {
			service := v.(model.Service)
			lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))
			if !ok {
				lastRefTime = uint64(0)
			}
			if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {
				sema.Acquire()
				go func() {
					hr.updateServiceNow(service.Name, service.Clusters)
					sema.Release()
				}()
			}
		}
		time.Sleep(1 * time.Second)
	}

}

updateServiceNow

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) {
	result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
	if err != nil {
		log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s  err:%s \n", serviceName, clusters, err.Error())
		return
	}
	if result == "" {
		log.Printf("[ERROR]:query list is empty!servieName:%s cluster:%s \n", serviceName, clusters)
		return
	}
	hr.ProcessServiceJson(result)
}

ProcessServiceJson

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) ProcessServiceJson(result string) {
	service := utils.JsonToService(result)
	if service == nil {
		return
	}
	cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)

	oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
	if ok && !hr.updateCacheWhenEmpty {
		//if instance list is empty,not to update cache
		if service.Hosts == nil || len(service.Hosts) == 0 {
			log.Printf("[ERROR]:do not have useful host, ignore it, name:%s \n", service.Name)
			return
		}
	}
	hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))
	hr.serviceInfoMap.Set(cacheKey, *service)
	if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
		if !ok {
			log.Println("[INFO] service not found in cache " + cacheKey)
		} else {
			log.Printf("[INFO] service key:%s was updated to:%s \n", cacheKey, utils.ToJsonString(service))
		}
		cache.WriteServicesToFile(*service, hr.cacheDir)
		hr.subCallback.ServiceChanged(service)
	}
}

小结

HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性

doc