聊聊dubbo-go的DubboPackage

/ distributedgolang / 没有评论 / 0浏览

本文只要研究一下dubbo-go的DubboPackage

DubboPackage

dubbo-go-v1.4.2/protocol/dubbo/codec.go

//CallType call type
type CallType int32

const (
	// CT_UNKNOWN unknown call type
	CT_UNKNOWN CallType = 0
	// CT_OneWay call one way
	CT_OneWay CallType = 1
	// CT_TwoWay call in request/response
	CT_TwoWay CallType = 2
)

// SequenceType ...
type SequenceType int64

// DubboPackage ...
type DubboPackage struct {
	Header  hessian.DubboHeader
	Service hessian.Service
	Body    interface{}
	Err     error
}

Marshal

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// Marshal ...
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
	codec := hessian.NewHessianCodec(nil)

	pkg, err := codec.Write(p.Service, p.Header, p.Body)
	if err != nil {
		return nil, perrors.WithStack(err)
	}

	return bytes.NewBuffer(pkg), nil
}

Unmarshal

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// Unmarshal ...
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
	// fix issue https://github.com/apache/dubbo-go/issues/380
	bufLen := buf.Len()
	if bufLen < hessian.HEADER_LENGTH {
		return perrors.WithStack(hessian.ErrHeaderNotEnough)
	}

	codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))

	// read header
	err := codec.ReadHeader(&p.Header)
	if err != nil {
		return perrors.WithStack(err)
	}

	if len(opts) != 0 { // for client
		client, ok := opts[0].(*Client)
		if !ok {
			return perrors.Errorf("opts[0] is not of type *Client")
		}

		if p.Header.Type&hessian.PackageRequest != 0x00 {
			// size of this array must be '7'
			// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
			p.Body = make([]interface{}, 7)
		} else {
			pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
			if !ok {
				return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
			}
			p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
		}
	}

	// read body
	err = codec.ReadBody(p.Body)
	return perrors.WithStack(err)
}

PendingResponse

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// PendingResponse ...
type PendingResponse struct {
	seq       uint64
	err       error
	start     time.Time
	readStart time.Time
	callback  common.AsyncCallback
	response  *Response
	done      chan struct{}
}

// NewPendingResponse ...
func NewPendingResponse() *PendingResponse {
	return &PendingResponse{
		start:    time.Now(),
		response: &Response{},
		done:     make(chan struct{}),
	}
}

GetCallResponse

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// GetCallResponse ...
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
	return AsyncCallbackResponse{
		Cause:     r.err,
		Start:     r.start,
		ReadStart: r.readStart,
		Reply:     r.response,
	}
}

小结

DubboPackage定义了Header、Service、Body、Err属性;codec.go提供了Marshal、Unmarshal方法用于读写DubboPackage

doc