聊聊kingbus的command.go

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的command.go

Close

kingbus/mysql/command.go

//Close the Conn
func (c *Conn) Close() {
	if c.closed.Load() == true {
		return
	}
	c.closed.Store(true)
	c.Conn.Close()
	c.cancel()
	c.Conn = nil
}

handleQuery

kingbus/mysql/command.go

func (c *Conn) handleQuery(sql string) (err error) {
	defer func() {
		if e := recover(); e != nil {
			if myerr, ok := e.(error); ok {
				const size = 4096
				buf := make([]byte, size)
				buf = buf[:runtime.Stack(buf, false)]
				log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf))
				err = myerr
			}
			return
		}
	}()

	log.Log.Infof("handleQuery sql:%s", sql)
	sqlParser := parser.New()
	stmt, err := sqlParser.ParseOneStmt(sql, "", "")
	if err != nil {
		return err
	}
	switch v := stmt.(type) {
	case *ast.ShowStmt:
		return c.handleShow(v)
	case *ast.SelectStmt:
		return c.handleSelect(v)
	case *ast.SetStmt:
		return c.handleSet(v)
	case *ast.KillStmt:
		return c.handleKill(v)
	//todo get metrics of slave
	default:
		return ErrSQLNotSupport
	}
	return
}

writeOK

kingbus/mysql/resp.go

func (c *Conn) writeOK(r *gomysql.Result) error {
	if r == nil {
		r = &gomysql.Result{}
	}

	r.Status |= c.status

	data := make([]byte, 4, 32)

	data = append(data, gomysql.OK_HEADER)

	data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...)
	data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...)

	if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 {
		data = append(data, byte(r.Status), byte(r.Status>>8))
		data = append(data, 0, 0)
	}

	return c.WritePacket(data)
}

handleBinlogDumpGtid

kingbus/mysql/command.go

//todo kill the slave with same uuid
func (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error {
	var (
		err             error
		heartbeatPeriod time.Duration
	)

	slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data)
	if err != nil {
		return err
	}

	//UnregisterSlave
	slaveUUID := c.userVariables[SlaveUUID].(string)
	defer c.binlogServer.UnregisterSlave(slaveUUID)

	err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted)
	if err != nil {
		log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
		return err
	}

	//get the previousGtidEvent raft index
	preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted)
	if err != nil {
		log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
		return err
	}

	fde, err := c.binlogServer.GetFde(preGtidEventIndex)
	if err != nil {
		log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s",
			err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
		return err
	}

	//1.send fake rotate event
	masterServerID := binary.LittleEndian.Uint32(fde[5:])
	fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex)
	if err != nil {
		log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s",
			err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
		return err
	}
	err = c.sendFakeRotateEvent(masterServerID, fileName)
	if err != nil {
		log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s",
			err, masterServerID, fileName)
		return err
	}

	//2.send fde
	err = c.sendFormatDescriptionEvent(fde)
	if err != nil {
		log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v",
			err, fde)
		return err
	}

	//3.send event
	eventC := make(chan *storagepb.BinlogEvent, 2000)
	errorC := make(chan error, 1)
	err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC)
	if err != nil {
		log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v",
			err, preGtidEventIndex, slaveGtidExecuted)
		return err
	}

	//4.new metrics
	slaveEps := metrics.NewMeter()
	slaveThroughput := metrics.NewMeter()
	metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps)
	metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput)

	if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok {
		heartbeatPeriod = time.Duration(period.(int64))
	} else {
		heartbeatPeriod = MaxHeartbeatPeriod
	}
	timer := time.NewTimer(heartbeatPeriod)
	for {
		select {
		case event := <-eventC:
			//event is not divided or the first divided event
			//WriteEvent need write a ok_header(one byte),after the header size
			if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) {
				err = c.WriteEvent(event.Data, true)
				if err != nil {
					log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
					return err
				}
			} else {
				err = c.WriteEvent(event.Data, false)
				if err != nil {
					log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
					return err
				}
				//event is divided,and the last packet size is MaxPayloadLen
				//need send a empty packet
				//https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
				if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen {
					err = c.WriteEvent(nil, false)
					if err != nil {
						log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
						return err
					}
				}
			}
			slaveEps.Mark(1)
			slaveThroughput.Mark(int64(len(event.Data)))
			//reset heartbeat period
			resetTime(timer, heartbeatPeriod)
		case err = <-errorC:
			log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err)
			return err
		case <-ctx.Done():
			log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit")
			return ctx.Err()
		case <-timer.C:
			//kingbus send the heartbeat log event which received by syncer to slave
			log.Log.Debugf("send a heartbeat log event to slave")
			err = c.sendHeartbeatEvent(masterServerID)
			if err != nil {
				return err
			}
			//reset heartbeat period
			resetTime(timer, heartbeatPeriod)
		}
	}
	return nil
}

handleRegisterSlave

kingbus/mysql/command.go

func (c *Conn) handleRegisterSlave(data []byte) error {
	var s Slave
	pos := 0

	s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:]))
	pos += 4

	hostNameLen := int(data[pos])
	pos++

	s.HostName = string(data[pos : pos+hostNameLen])
	pos += hostNameLen

	userLen := int(data[pos])
	pos++

	s.User = string(data[pos : pos+userLen])
	pos += userLen

	passwordLen := int(data[pos])
	pos++

	s.Password = string(data[pos : pos+passwordLen])
	pos += passwordLen

	s.Port = int16(binary.LittleEndian.Uint16(data[pos:]))
	pos += 2

	s.Rank = binary.LittleEndian.Uint32(data[pos:])
	pos += 4

	s.MasterID = binary.LittleEndian.Uint32(data[pos:])
	s.State = REGISTERED

	//kill the zombie dump thread with same uuid
	c.killZombieDumpThreads()

	if uuid, ok := c.userVariables[SlaveUUID]; ok {
		s.UUID = uuid.(string)
	} else {
		s.UUID = ""
	}
	s.ConnectTime = time.Now()
	s.Conn = c
	err := c.binlogServer.RegisterSlave(&s)
	if err != nil {
		return err
	}
	log.Log.Infof("handleRegisterSlave:slave info:%v", s)
	return c.writeOK(nil)
}

小结

kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法

doc