聊聊kingbus的DumpBinlogAt

/ stream / 没有评论 / 20浏览

本文主要研究一下kingbus的DumpBinlogAt

DumpBinlogAt

kingbus/server/binlog_server.go

//DumpBinlogAt implements dump binlog event by slave executed gtid set
func (s *BinlogServer) DumpBinlogAt(ctx context.Context,
	startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet,
	eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error {
	var inExcludeGroup = false

	//new a binlog event reader from startRaftIndex, then send event to slave one by one
	reader, err := s.store.NewEntryReaderAt(startRaftIndex)
	if err != nil {
		log.Log.Errorf("NewEntryReaderAt error,err:%s,raftIndex:%d", err, startRaftIndex)
		return err
	}

	nextRaftIndex := reader.NextRaftIndex()
	log.Log.Infof("DumpBinlogAt:raftIndex:%d,slaveGtids:%s", nextRaftIndex, slaveGtids.String())
	go func() {
		for {
			//the next read raftIndex must be little than AppliedIndex
			if nextRaftIndex <= s.kingbusInfo.AppliedIndex() {
				raftEntry, err := reader.GetNext()
				if err != nil {
					log.Log.Errorf("reader.GetNext error,err:%s,nextRaftIndex:%d,AppliedIndex:%d",
						err, nextRaftIndex, s.kingbusInfo.AppliedIndex())
					select {
					case errorC <- err:
					default:
					}
					return //need quit
				}
				nextRaftIndex = reader.NextRaftIndex()

				//this entry is not binlog event
				if utils.IsBinlogEvent(raftEntry) == false {
					continue
				}
				event := utils.DecodeBinlogEvent(raftEntry)
				//filter the event in slave gtids,if the event has send to slave
				inExcludeGroup = s.skipEvent(event, slaveGtids, inExcludeGroup)
				if inExcludeGroup {
					continue
				}

				select {
				case eventC <- event:
				case <-ctx.Done():
					log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())
					select {
					case errorC <- ctx.Err():
					default:
					}
					return //need quit
				}
			} else {
				select {
				case <-s.broadcast.Receive():
					break
				case <-ctx.Done():
					log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())
					select {
					case errorC <- ctx.Err():
					default:
					}
					return //need quit
				}
			}
		}
	}()

	return nil
}

NewEntryReaderAt

kingbus/storage/disk_storage.go

//NewEntryReaderAt create a DiskEntryReader at raftIndex
func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error) {
	err := s.checkRaftIndex(raftIndex)
	if err != nil {
		log.Log.Errorf("checkRaftIndex error,err:%s,raftIndex:%d", err, raftIndex)
		return nil, err
	}

	reader := new(DiskEntryReader)
	reader.indexReadAt = raftIndex
	reader.store = s

	return reader, nil
}

skipEvent

kingbus/server/binlog_server.go

//skipEvent filter the event has been executed by slave
func (s *BinlogServer) skipEvent(event *storagepb.BinlogEvent, slaveGtids *gomysql.MysqlGTIDSet, inExcludeGroup bool) bool {
	switch replication.EventType(event.Type) {
	case replication.GTID_EVENT:
		//remove header
		eventBody := event.Data[replication.EventHeaderSize:]
		//remove crc32
		eventBody = eventBody[:len(eventBody)-replication.BinlogChecksumLength]

		gtidEvent := &replication.GTIDEvent{}
		if err := gtidEvent.Decode(eventBody); err != nil {
			log.Log.Errorf("Decode gtid event error,err:%s", err)
			return true
		}
		u, err := uuid.FromBytes(gtidEvent.SID)
		if err != nil {
			log.Log.Errorf("FromBytes error,err:%s,sid:%v", err, gtidEvent.SID)
			return true
		}
		gtidStr := fmt.Sprintf("%s:%d", u.String(), gtidEvent.GNO)
		currentGtidset, err := gomysql.ParseMysqlGTIDSet(gtidStr)
		if err != nil {
			log.Log.Errorf("ParseMysqlGTIDSet error,err:%s,gtid:%s", err, gtidStr)
			return true
		}
		return slaveGtids.Contain(currentGtidset)
	case replication.ROTATE_EVENT:
		return false
	}
	return inExcludeGroup
}

小结

DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC

doc