聊聊dubbo-go的DubboProtocol

/ distributedgolang / 没有评论 / 60浏览

本文主要研究一下dubbo-go的DubboProtocol

DubboProtocol

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// dubbo protocol constant
const (
	// DUBBO ...
	DUBBO = "dubbo"
)

func init() {
	extension.SetProtocol(DUBBO, GetProtocol)
}

var (
	dubboProtocol *DubboProtocol
)

// DubboProtocol ...
type DubboProtocol struct {
	protocol.BaseProtocol
	serverMap  map[string]*Server
	serverLock sync.Mutex
}

GetProtocol

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// GetProtocol ...
func GetProtocol() protocol.Protocol {
	if dubboProtocol == nil {
		dubboProtocol = NewDubboProtocol()
	}
	return dubboProtocol
}

NewDubboProtocol

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// NewDubboProtocol ...
func NewDubboProtocol() *DubboProtocol {
	return &DubboProtocol{
		BaseProtocol: protocol.NewBaseProtocol(),
		serverMap:    make(map[string]*Server),
	}
}

Export

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// Export ...
func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
	url := invoker.GetUrl()
	serviceKey := url.ServiceKey()
	exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
	dp.SetExporterMap(serviceKey, exporter)
	logger.Infof("Export service: %s", url.String())

	// start server
	dp.openServer(url)
	return exporter
}

openServer

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

func (dp *DubboProtocol) openServer(url common.URL) {
	_, ok := dp.serverMap[url.Location]
	if !ok {
		_, ok := dp.ExporterMap().Load(url.ServiceKey())
		if !ok {
			panic("[DubboProtocol]" + url.Key() + "is not existing")
		}

		dp.serverLock.Lock()
		_, ok = dp.serverMap[url.Location]
		if !ok {
			srv := NewServer()
			dp.serverMap[url.Location] = srv
			srv.Start(url)
		}
		dp.serverLock.Unlock()
	}
}

Refer

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// Refer ...
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
	//default requestTimeout
	var requestTimeout = config.GetConsumerConfig().RequestTimeout

	requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
	if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
		requestTimeout = t
	}

	invoker := NewDubboInvoker(url, NewClient(Options{
		ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
		RequestTimeout: requestTimeout,
	}))
	dp.SetInvokers(invoker)
	logger.Infof("Refer service: %s", url.String())
	return invoker
}

Destroy

dubbo-go-v1.4.2/protocol/dubbo/dubbo_protocol.go

// Destroy ...
func (dp *DubboProtocol) Destroy() {
	logger.Infof("DubboProtocol destroy.")

	dp.BaseProtocol.Destroy()

	// stop server
	for key, server := range dp.serverMap {
		delete(dp.serverMap, key)
		server.Stop()
	}
}

小结

DubboProtocol嵌套了protocol.BaseProtocol,定义了serverMap、serverLock属性;Export方法通过NewDubboExporter创建exporter,然后更新到DubboProtocol的exporterMap中,之后执行DubboProtocol的openServer;Refer方法先获取requestTimeout,之后通过NewDubboInvoker创建invoker,然后执行dp.SetInvokers(invoker);Destroy方法先执行dp.BaseProtocol.Destroy(),之后遍历dp.serverMap,执行delete(dp.serverMap, key)及server.Stop()

doc