聊聊dubbo-go的kubernetesRegistry

/ distributedgolang / 没有评论 / 0浏览

本文主要研究一下dubbo-go的kubernetesRegistry

kubernetesRegistry

dubbo-go-v1.4.2/registry/kubernetes/registry.go

var (
	processID = ""
	localIP   = ""
)

const (
	Name         = "kubernetes"
	ConnDelay    = 3
	MaxFailTimes = 15
)

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
	extension.SetRegistry(Name, newKubernetesRegistry)
}

type kubernetesRegistry struct {
	registry.BaseRegistry
	cltLock        sync.RWMutex
	client         *kubernetes.Client
	listenerLock   sync.Mutex
	listener       *kubernetes.EventListener
	dataListener   *dataListener
	configListener *configurationListener
}

newKubernetesRegistry

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {

	// actually, kubernetes use in-cluster config,
	r := &kubernetesRegistry{}

	r.InitBaseRegistry(url, r)

	if err := kubernetes.ValidateClient(r); err != nil {
		return nil, perrors.WithStack(err)
	}

	r.WaitGroup().Add(1)
	go r.HandleClientRestart()
	r.InitListeners()

	logger.Debugf("the kubernetes registry started")

	return r, nil
}

InitListeners

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) InitListeners() {
	r.listener = kubernetes.NewEventListener(r.client)
	r.configListener = NewConfigurationListener(r)
	r.dataListener = NewRegistryDataListener(r.configListener)
}

DoRegister

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) DoRegister(root string, node string) error {
	return r.client.Create(path.Join(root, node), "")
}

DoSubscribe

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

	var (
		configListener *configurationListener
	)

	r.listenerLock.Lock()
	configListener = r.configListener
	r.listenerLock.Unlock()
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("kubernetes client broken")
		}

		r.listenerLock.Lock()
		if r.listener == nil {
			// double check
			r.listener = kubernetes.NewEventListener(r.client)
		}
		r.listenerLock.Unlock()
	}

	//register the svc to dataListener
	r.dataListener.AddInterestedURL(svc)
	for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
		go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
	}

	return configListener, nil
}

小结

kubernetesRegistry定义了cltLock、client、listenerLock、listener、dataListener、configListener属性;InitListeners方法执行kubernetes.NewEventListener、NewConfigurationListener、NewRegistryDataListener

doc