聊聊kingbus的startMasterServer

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的startMasterServer

startMasterServer

kingbus/server/server.go

func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error {
	master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast)
	if err != nil {
		log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args)
		return err
	}
	s.master = master
	s.master.Start()
	log.Log.Infof("startMasterServer success,args:%v", *args)
	return nil
}

NewBinlogServer

kingbus/server/binlog_server.go

//NewBinlogServer create a binlog server
func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) {
	var err error
	s := new(BinlogServer)

	s.started = atomic.NewBool(false)
	s.cfg = cfg
	s.listener, err = net.Listen("tcp", s.cfg.Addr)
	if err != nil {
		log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr)
		return nil, err
	}
	s.store = store
	s.broadcast = broadcast
	s.kingbusInfo = ki
	s.slaves = make(map[string]*mysql.Slave)
	s.errch = make(chan error, 1)

	return s, nil
}

BinlogServer

kingbus/server/binlog_server.go

//BinlogServer is a binlog server,send binlog event to slave.
//The generic process:
//1.authentication
//SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'
//SET @master_binlog_checksum='NONE'
//SET @master_heartbeat_period=%d
//2.COM_REGISTER_SLAVE
//3.semi-sync:
//SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';
//SET @rpl_semi_sync_slave = 1
//4.COM_BINLOG_DUMP_GTID
type BinlogServer struct {
	started *atomic.Bool
	cfg     *config.BinlogServerConfig

	listener net.Listener
	errch    chan error

	l      sync.RWMutex
	slaves map[string]*mysql.Slave //key is uuid

	broadcast   *utils.Broadcast
	kingbusInfo KingbusInfo
	store       storage.Storage
}

Start

kingbus/server/binlog_server.go

//Start implements binlog server start
func (s *BinlogServer) Start() {
	s.started.Store(true)
	go func() {
		for s.started.Load() {
			select {
			case err := <-s.errch:
				log.Log.Errorf("binlog server Run error,err:%s", err)
				s.Stop()
				return
			default:
				conn, err := s.listener.Accept()
				if err != nil {
					log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err)
					continue
				}
				go s.onConn(conn)
			}
		}
	}()
}

onConn

kingbus/server/binlog_server.go

func (s *BinlogServer) onConn(c net.Conn) {
	mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password)
	if err != nil {
		log.Log.Errorf("onConn error,err:%s", err)
		return
	}
	mysqlConn.Run()
}

NewConn

kingbus/mysql/conn.go

//NewConn create a Conn
func NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) {
	c := new(Conn)

	c.user = user
	c.BaseConn = NewBaseConn(conn)
	c.connectionID = baseConnID.Add(1)
	c.salt, _ = gomysql.RandomBuf(20)
	c.closed = atomic.NewBool(false)
	masterInfo, err := s.GetMasterInfo()
	if err != nil {
		c.BaseConn.Close()
		log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err)
		return nil, err
	}

	err = c.handshake(masterInfo.Version, password)
	if err != nil {
		c.BaseConn.Close()
		log.Log.Errorf("NewConn:handshake error,err:%s", err)
		return nil, err
	}

	c.ctx, c.cancel = context.WithCancel(context.Background())
	c.userVariables = make(map[string]interface{})
	c.binlogServer = s

	return c, nil
}

handshake

kingbus/mysql/conn.go

//handshake implements the handshake protocol in mysql
func (c *Conn) handshake(serverVersion, password string) error {
	if err := c.writeInitialHandshake(serverVersion); err != nil {
		return err
	}

	if err := c.readHandshakeResponse(password); err != nil {
		c.writeError(err)
		return err
	}

	if err := c.writeOK(nil); err != nil {
		return err
	}

	c.ResetSequence()

	return nil
}

Run

kingbus/mysql/conn.go

//Run implements handle client request in Conn
func (c *Conn) Run() {
	defer func() {
		r := recover()
		if err, ok := r.(error); ok {
			const size = 4096
			buf := make([]byte, size)
			buf = buf[:runtime.Stack(buf, false)]

			log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf))
		}
		c.Close()
		log.Log.Debugf("close client connection")
	}()

	for {
		select {
		case <-c.ctx.Done():
			log.Log.Debugf("BinlogServer closed, close connection")
			return
		default:
			data, err := c.ReadPacket()
			if err != nil {
				log.Log.Errorf("ReadPacket error,err:%s", err)
				return
			}

			if err := c.dispatch(c.ctx, data); err != nil {
				log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data)
				//if the error is canceled, means the connection was killed by cmd
				//don't need send error message
				if err != context.Canceled && c.closed.Load() == false {
					c.writeError(err)
				}
				return
			}

			//if the connection is closed, return from loop
			if c.closed.Load() == true {
				log.Log.Infof("connection status is closed,need return")
				return
			}
			c.Sequence = 0
		}
	}
}

-Run方法接收请求,然后通过c.dispatch(c.ctx, data)进行分发

dispatch

kingbus/mysql/command.go

func (c *Conn) dispatch(ctx context.Context, data []byte) error {
	cmd := data[0]
	data = data[1:]

	switch cmd {
	case gomysql.COM_QUIT:
		c.Close()
		log.Log.Debugf("close client connection")
		return nil
	case gomysql.COM_QUERY:
		return c.handleQuery(utils.BytesToString(data))
	case gomysql.COM_PING:
		return c.writeOK(nil)
	case gomysql.COM_BINLOG_DUMP_GTID:
		return c.handleBinlogDumpGtid(ctx, data)
	case gomysql.COM_REGISTER_SLAVE:
		return c.handleRegisterSlave(data)
	default:
		log.Log.Errorf("master not support this cmd:%v", data)
		return c.writeError(ErrSQLNotSupport)
	}

	return nil
}

小结

startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法;master的Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn);onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

doc