聊聊dubbo-go的DubboInvoker

/ distributedgolang / 没有评论 / 10浏览

本文主要研究一下dubbo-go的DubboInvoker

Invoker

dubbo-go-v1.4.2/protocol/invoker.go

// Extension - Invoker
type Invoker interface {
	common.Node
	Invoke(context.Context, Invocation) Result
}

/////////////////////////////
// base invoker
/////////////////////////////

// BaseInvoker ...
type BaseInvoker struct {
	url       common.URL
	available bool
	destroyed bool
}

// NewBaseInvoker ...
func NewBaseInvoker(url common.URL) *BaseInvoker {
	return &BaseInvoker{
		url:       url,
		available: true,
		destroyed: false,
	}
}

// GetUrl ...
func (bi *BaseInvoker) GetUrl() common.URL {
	return bi.url
}

// IsAvailable ...
func (bi *BaseInvoker) IsAvailable() bool {
	return bi.available
}

// IsDestroyed ...
func (bi *BaseInvoker) IsDestroyed() bool {
	return bi.destroyed
}

// Invoke ...
func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result {
	return &RPCResult{}
}

// Destroy ...
func (bi *BaseInvoker) Destroy() {
	logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
	bi.destroyed = true
	bi.available = false
}

DubboInvoker

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

var (
	// ErrNoReply ...
	ErrNoReply          = perrors.New("request need @response")
	ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
	attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
)

// DubboInvoker ...
type DubboInvoker struct {
	protocol.BaseInvoker
	client   *Client
	quitOnce sync.Once
	// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
	reqNum int64
}

NewDubboInvoker

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// NewDubboInvoker ...
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
	return &DubboInvoker{
		BaseInvoker: *protocol.NewBaseInvoker(url),
		client:      client,
		reqNum:      0,
	}
}

Invoke

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// Invoke ...
func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
	var (
		err    error
		result protocol.RPCResult
	)
	if di.reqNum < 0 {
		// Generally, the case will not happen, because the invoker has been removed
		// from the invoker list before destroy,so no new request will enter the destroyed invoker
		logger.Warnf("this dubboInvoker is destroyed")
		result.Err = ErrDestroyedInvoker
		return &result
	}
	atomic.AddInt64(&(di.reqNum), 1)
	defer atomic.AddInt64(&(di.reqNum), -1)

	inv := invocation.(*invocation_impl.RPCInvocation)
	for _, k := range attachmentKey {
		if v := di.GetUrl().GetParam(k, ""); len(v) > 0 {
			inv.SetAttachments(k, v)
		}
	}

	// put the ctx into attachment
	di.appendCtx(ctx, inv)

	url := di.GetUrl()
	// async
	async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
	if err != nil {
		logger.Errorf("ParseBool - error: %v", err)
		async = false
	}
	response := NewResponse(inv.Reply(), nil)
	if async {
		if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
			result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
		} else {
			result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
		}
	} else {
		if inv.Reply() == nil {
			result.Err = ErrNoReply
		} else {
			result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
		}
	}
	if result.Err == nil {
		result.Rest = inv.Reply()
		result.Attrs = response.atta
	}
	logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)

	return &result
}

Destroy

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// Destroy ...
func (di *DubboInvoker) Destroy() {
	di.quitOnce.Do(func() {
		for {
			if di.reqNum == 0 {
				di.reqNum = -1
				logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
				di.BaseInvoker.Destroy()
				if di.client != nil {
					di.client.Close()
					di.client = nil
				}
				break
			}
			logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
			time.Sleep(1 * time.Second)
		}

	})
}

小结

Invoker定义了Invoke方法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker方法实例化了BaseInvoker,其available为true,destroyed为false;Destroy方法设置available为false,destroyed为true;DubboInvoker的Invoke方法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最后返回result

doc