聊聊promtail的positions

/ golang / 没有评论 / 20浏览

本文主要研究一下promtail的positions

Positions

loki/pkg/promtail/positions/positions.go

type Positions interface {
	// GetString returns how far we've through a file as a string.
	// JournalTarget writes a journal cursor to the positions file, while
	// FileTarget writes an integer offset. Use Get to read the integer
	// offset.
	GetString(path string) string
	// Get returns how far we've read through a file. Returns an error
	// if the value stored for the file is not an integer.
	Get(path string) (int64, error)
	// PutString records (asynchronously) how far we've read through a file.
	// Unlike Put, it records a string offset and is only useful for
	// JournalTargets which doesn't have integer offsets.
	PutString(path string, pos string)
	// Put records (asynchronously) how far we've read through a file.
	Put(path string, pos int64)
	// Remove removes the position tracking for a filepath
	Remove(path string)
	// SyncPeriod returns how often the positions file gets resynced
	SyncPeriod() time.Duration
	// Stop the Position tracker.
	Stop()
}

Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法

positions

loki/pkg/promtail/positions/positions.go

// Positions tracks how far through each file we've read.
type positions struct {
	logger    log.Logger
	cfg       Config
	mtx       sync.Mutex
	positions map[string]string
	quit      chan struct{}
	done      chan struct{}
}

func (p *positions) Stop() {
	close(p.quit)
	<-p.done
}

func (p *positions) PutString(path string, pos string) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	p.positions[path] = pos
}

func (p *positions) Put(path string, pos int64) {
	p.PutString(path, strconv.FormatInt(pos, 10))
}

func (p *positions) GetString(path string) string {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	return p.positions[path]
}

func (p *positions) Get(path string) (int64, error) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	pos, ok := p.positions[path]
	if !ok {
		return 0, nil
	}
	return strconv.ParseInt(pos, 10, 64)
}

func (p *positions) Remove(path string) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	p.remove(path)
}

func (p *positions) remove(path string) {
	delete(p.positions, path)
}

func (p *positions) SyncPeriod() time.Duration {
	return p.cfg.SyncPeriod
}

positions定义了logger、cfg、mtx、positions、quit、done属性;它实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除

New

loki/pkg/promtail/positions/positions.go

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (Positions, error) {
	positionData, err := readPositionsFile(cfg, logger)
	if err != nil {
		return nil, err
	}

	p := &positions{
		logger:    logger,
		cfg:       cfg,
		positions: positionData,
		quit:      make(chan struct{}),
		done:      make(chan struct{}),
	}

	go p.run()
	return p, nil
}

New方法会通过readPositionsFile读取positionData创建positions,然后异步执行p.run()

run

loki/pkg/promtail/positions/positions.go

func (p *positions) run() {
	defer func() {
		p.save()
		level.Debug(p.logger).Log("msg", "positions saved")
		close(p.done)
	}()

	ticker := time.NewTicker(p.cfg.SyncPeriod)
	for {
		select {
		case <-p.quit:
			return
		case <-ticker.C:
			p.save()
			p.cleanup()
		}
	}
}

func (p *positions) save() {
	if p.cfg.ReadOnly {
		return
	}
	p.mtx.Lock()
	positions := make(map[string]string, len(p.positions))
	for k, v := range p.positions {
		positions[k] = v
	}
	p.mtx.Unlock()

	if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
		level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
	}
}

func (p *positions) cleanup() {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	toRemove := []string{}
	for k := range p.positions {
		// If the position file is prefixed with journal, it's a
		// JournalTarget cursor and not a file on disk.
		if strings.HasPrefix(k, "journal-") {
			continue
		}

		if _, err := os.Stat(k); err != nil {
			if os.IsNotExist(err) {
				// File no longer exists.
				toRemove = append(toRemove, k)
			} else {
				// Can't determine if file exists or not, some other error.
				level.Warn(p.logger).Log("msg", "could not determine if log file "+
					"still exists while cleaning positions file", "error", err)
			}
		}
	}
	for _, tr := range toRemove {
		p.remove(tr)
	}
}

run方法通过time.NewTicker(p.cfg.SyncPeriod)来触发执行p.save()及p.cleanup();save方法将positions持久化到文件;cleanup方法遍历p.positions,从内存中移除文件不存在的position

readPositionsFile

loki/pkg/promtail/positions/positions.go

func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

	cleanfn := filepath.Clean(cfg.PositionsFile)
	buf, err := ioutil.ReadFile(cleanfn)
	if err != nil {
		if os.IsNotExist(err) {
			return map[string]string{}, nil
		}
		return nil, err
	}

	var p File
	err = yaml.UnmarshalStrict(buf, &p)
	if err != nil {
		// return empty if cfg option enabled
		if cfg.IgnoreInvalidYaml {
			level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
			return map[string]string{}, nil
		}

		return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
	}

	// p.Positions will be nil if the file exists but is empty
	if p.Positions == nil {
		p.Positions = map[string]string{}
	}

	return p.Positions, nil
}

readPositionsFile方法从文件读取位置到p.Positions

writePositionFile

loki/pkg/promtail/positions/positions.go

func writePositionFile(filename string, positions map[string]string) error {
	buf, err := yaml.Marshal(File{
		Positions: positions,
	})
	if err != nil {
		return err
	}

	target := filepath.Clean(filename)
	temp := target + "-new"

	err = ioutil.WriteFile(temp, buf, os.FileMode(positionFileMode))
	if err != nil {
		return err
	}

	return os.Rename(temp, target)
}

writePositionFile方法将positions写入文件

小结

promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法;positions实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除。

doc