聊聊dubbo-go的nacosRegistry

/ distributedgolang / 没有评论 / 60浏览

本文主要研究一下dubbo-go的nacosRegistry

nacosRegistry

dubbo-go-v1.4.2/registry/nacos/registry.go

var (
	localIP = ""
)

const (
	//RegistryConnDelay registry connection delay
	RegistryConnDelay = 3
)

func init() {
	localIP, _ = gxnet.GetLocalIP()
	extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry)
}

type nacosRegistry struct {
	*common.URL
	namingClient naming_client.INamingClient
}

newNacosRegistry

dubbo-go-v1.4.2/registry/nacos/registry.go

func newNacosRegistry(url *common.URL) (registry.Registry, error) {
	nacosConfig, err := getNacosConfig(url)
	if err != nil {
		return nil, err
	}
	client, err := clients.CreateNamingClient(nacosConfig)
	if err != nil {
		return nil, err
	}
	registry := nacosRegistry{
		URL:          url,
		namingClient: client,
	}
	return &registry, nil
}

getNacosConfig

dubbo-go-v1.4.2/registry/nacos/registry.go

func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
	if url == nil {
		return nil, perrors.New("url is empty!")
	}
	if len(url.Location) == 0 {
		return nil, perrors.New("url.location is empty!")
	}
	configMap := make(map[string]interface{}, 2)

	addresses := strings.Split(url.Location, ",")
	serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
	for _, addr := range addresses {
		ip, portStr, err := net.SplitHostPort(addr)
		if err != nil {
			return nil, perrors.WithMessagef(err, "split [%s] ", addr)
		}
		port, _ := strconv.Atoi(portStr)
		serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
			IpAddr: ip,
			Port:   uint64(port),
		})
	}
	configMap["serverConfigs"] = serverConfigs

	var clientConfig nacosConstant.ClientConfig
	timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
	if err != nil {
		return nil, err
	}
	clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
	clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
	clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
	clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
	clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
	clientConfig.Username = url.GetParam(constant.NACOS_USERNAME, "")
	clientConfig.Password = url.GetParam(constant.NACOS_PASSWORD, "")
	clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACEID, "")
	clientConfig.NotLoadCacheAtStart = true
	configMap["clientConfig"] = clientConfig

	return configMap, nil
}

Register

dubbo-go-v1.4.2/registry/nacos/registry.go

func (nr *nacosRegistry) Register(url common.URL) error {
	serviceName := getServiceName(url)
	param := createRegisterParam(url, serviceName)
	isRegistry, err := nr.namingClient.RegisterInstance(param)
	if err != nil {
		return err
	}
	if !isRegistry {
		return perrors.New("registry [" + serviceName + "] to  nacos failed")
	}
	return nil
}

Subscribe

dubbo-go-v1.4.2/registry/nacos/registry.go

//subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
	for {
		if !nr.IsAvailable() {
			logger.Warnf("event listener game over.")
			return
		}

		listener, err := nr.subscribe(url)
		if err != nil {
			if !nr.IsAvailable() {
				logger.Warnf("event listener game over.")
				return
			}
			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
			continue
		}

		for {
			serviceEvent, err := listener.Next()
			if err != nil {
				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
				listener.Close()
				return
			}

			logger.Infof("update begin, service event: %v", serviceEvent.String())
			notifyListener.Notify(serviceEvent)
		}

	}
}

小结

newNacosRegistry方法先通过getNacosConfig获取nacosConfig,之后通过clients.CreateNamingClient(nacosConfig)创建client,最后实例化nacosRegistry

doc