tail/tail.go

438 lines
10 KiB
Go
Raw Normal View History

// Copyright (c) 2015 HPE Software Inc. All rights reserved.
2013-01-08 04:54:49 +08:00
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package tail
import (
"bufio"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strings"
2015-12-24 16:47:46 +08:00
"sync"
"time"
2015-07-03 22:51:41 +08:00
2021-10-13 21:28:19 +08:00
"git.lovezsh.com/go-kit/tail/ratelimiter"
"git.lovezsh.com/go-kit/tail/util"
"git.lovezsh.com/go-kit/tail/watch"
2015-07-03 22:51:41 +08:00
"gopkg.in/tomb.v1"
)
var (
ErrStop = errors.New("tail should now stop")
)
type Line struct {
Text string
Time time.Time
2013-09-18 07:54:55 +08:00
Err error // Error from tail
}
// NewLine returns a Line with present time.
func NewLine(text string) *Line {
return &Line{text, time.Now(), nil}
}
2013-08-10 06:15:40 +08:00
// SeekInfo represents arguments to `os.Seek`
type SeekInfo struct {
Offset int64
Whence int // os.SEEK_*
}
type logger interface {
Fatal(v ...interface{})
Fatalf(format string, v ...interface{})
Fatalln(v ...interface{})
Panic(v ...interface{})
Panicf(format string, v ...interface{})
Panicln(v ...interface{})
Print(v ...interface{})
Printf(format string, v ...interface{})
Println(v ...interface{})
}
2013-05-30 07:32:01 +08:00
// Config is used to specify how a file must be tailed.
type Config struct {
2013-09-18 07:54:55 +08:00
// File-specifc
2014-04-30 07:39:57 +08:00
Location *SeekInfo // Seek to this location before tailing
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify
2015-09-25 03:40:43 +08:00
Pipe bool // Is a named pipe (mkfifo)
2014-04-30 07:39:57 +08:00
RateLimiter *ratelimiter.LeakyBucket
2013-09-18 07:54:55 +08:00
// Generic IO
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
2014-01-30 20:47:28 +08:00
// Logger, when nil, is set to tail.DefaultLogger
// To disable logging: set field to tail.DiscardingLogger
Logger logger
}
type Tail struct {
Filename string
Lines chan *Line
Config
file *os.File
reader *bufio.Reader
2015-07-03 22:53:42 +08:00
watcher watch.FileWatcher
changes *watch.FileChanges
tomb.Tomb // provides: Done, Kill, Dying
2015-12-24 16:47:46 +08:00
lk sync.Mutex
}
var (
// DefaultLogger is used when Config.Logger == nil
DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
// DiscardingLogger can be used to disable logging output
DiscardingLogger = log.New(ioutil.Discard, "", 0)
)
2013-05-30 07:32:01 +08:00
// TailFile begins tailing the file. Output stream is made available
// via the `Tail.Lines` channel. To handle errors during tailing,
// invoke the `Wait` or `Err` method after finishing reading from the
// `Lines` channel.
func TailFile(filename string, config Config) (*Tail, error) {
if config.ReOpen && !config.Follow {
util.Fatal("cannot set ReOpen without Follow.")
}
t := &Tail{
Filename: filename,
Lines: make(chan *Line),
2014-01-30 20:47:28 +08:00
Config: config,
}
// when Logger was not specified in config, use default logger
if t.Logger == nil {
t.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
if t.Poll {
t.watcher = watch.NewPollingFileWatcher(filename)
} else {
t.watcher = watch.NewInotifyFileWatcher(filename)
}
if t.MustExist {
var err error
t.file, err = OpenFile(t.Filename)
if err != nil {
return nil, err
}
}
go t.tailFileSync()
return t, nil
}
// Return the file's current position, like stdio's ftell().
// But this value is not very accurate.
2013-08-24 15:43:25 +08:00
// it may readed one line in the chan(tail.Lines),
// so it may lost one line.
func (tail *Tail) Tell() (offset int64, err error) {
if tail.file == nil {
return
}
offset, err = tail.file.Seek(0, os.SEEK_CUR)
2016-01-25 14:19:23 +08:00
if err != nil {
return
}
2016-01-25 14:19:23 +08:00
tail.lk.Lock()
defer tail.lk.Unlock()
if tail.reader == nil {
return
}
offset -= int64(tail.reader.Buffered())
return
}
// Stop stops the tailing activity.
func (tail *Tail) Stop() error {
tail.Kill(nil)
return tail.Wait()
}
// StopAtEOF stops tailing as soon as the end of the file is reached.
func (tail *Tail) StopAtEOF() error {
tail.Kill(errStopAtEOF)
return tail.Wait()
}
var errStopAtEOF = errors.New("tail: stop at eof")
func (tail *Tail) close() {
close(tail.Lines)
2016-04-04 22:51:26 +08:00
tail.closeFile()
2015-12-09 16:38:19 +08:00
}
2016-04-04 22:51:26 +08:00
func (tail *Tail) closeFile() {
if tail.file != nil {
tail.file.Close()
2015-12-09 16:38:19 +08:00
tail.file = nil
}
}
func (tail *Tail) reopen() error {
2016-04-04 22:51:26 +08:00
tail.closeFile()
for {
var err error
tail.file, err = OpenFile(tail.Filename)
if err != nil {
if os.IsNotExist(err) {
tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
2013-09-23 15:03:51 +08:00
if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
if err == tomb.ErrDying {
return err
}
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
}
continue
}
return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
}
break
}
return nil
}
func (tail *Tail) readLine() (string, error) {
2015-12-24 16:47:46 +08:00
tail.lk.Lock()
line, err := tail.reader.ReadString('\n')
2015-12-24 16:47:46 +08:00
tail.lk.Unlock()
if err != nil {
// Note ReadString "returns the data read before the error" in
// case of an error, including EOF, so we return it as is. The
// caller is expected to process it if err is EOF.
2014-05-17 07:59:29 +08:00
return line, err
}
line = strings.TrimRight(line, "\n")
2013-10-11 17:00:29 +08:00
return line, err
}
func (tail *Tail) tailFileSync() {
defer tail.Done()
defer tail.close()
if !tail.MustExist {
// deferred first open.
err := tail.reopen()
if err != nil {
2013-09-24 09:13:19 +08:00
if err != tomb.ErrDying {
tail.Kill(err)
}
return
}
}
// Seek to requested location on first open of the file.
if tail.Location != nil {
_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
if err != nil {
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
}
2014-05-17 08:03:16 +08:00
tail.openReader()
var offset int64
2015-09-25 03:40:43 +08:00
var err error
// Read line by line.
for {
2015-09-25 03:40:43 +08:00
// do not seek in named pipes
if !tail.Pipe {
// grab the position in case we need to back up in the event of a half-line
offset, err = tail.Tell()
if err != nil {
tail.Kill(err)
return
}
}
line, err := tail.readLine()
// Process `line` even if err is EOF.
if err == nil {
cooloff := !tail.sendLine(line)
if cooloff {
// Wait a second before seeking till the end of
// file when rate limit is reached.
msg := ("Too much log activity; waiting a second " +
"before resuming tailing")
tail.Lines <- &Line{msg, time.Now(), errors.New(msg)}
select {
case <-time.After(time.Second):
case <-tail.Dying():
return
}
if err := tail.seekEnd(); err != nil {
tail.Kill(err)
return
}
}
2014-05-17 09:10:04 +08:00
} else if err == io.EOF {
2013-05-31 04:18:46 +08:00
if !tail.Follow {
if line != "" {
tail.sendLine(line)
}
2013-05-31 04:18:46 +08:00
return
}
if tail.Follow && line != "" {
// this has the potential to never return the last line if
// it's not followed by a newline; seems a fair trade here
err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
if err != nil {
tail.Kill(err)
return
}
}
// When EOF is reached, wait for more data to become
// available. Wait strategy is based on the `tail.watcher`
// implementation (inotify or polling).
err := tail.waitForChanges()
2013-05-30 07:32:01 +08:00
if err != nil {
if err != ErrStop {
tail.Kill(err)
}
return
}
2014-05-17 09:10:04 +08:00
} else {
// non-EOF error
tail.Killf("Error reading %s: %s", tail.Filename, err)
return
}
select {
case <-tail.Dying():
if tail.Err() == errStopAtEOF {
continue
}
return
default:
}
}
}
// waitForChanges waits until the file has been appended, deleted,
2013-05-30 07:32:01 +08:00
// moved or truncated. When moved or deleted - the file will be
// reopened if ReOpen is true. Truncated files are always reopened.
func (tail *Tail) waitForChanges() error {
if tail.changes == nil {
pos, err := tail.file.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
2015-12-24 16:47:46 +08:00
tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
if err != nil {
return err
}
}
select {
case <-tail.changes.Modified:
2013-05-30 07:32:01 +08:00
return nil
case <-tail.changes.Deleted:
tail.changes = nil
if tail.ReOpen {
// XXX: we must not log from a library.
tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
}
tail.Logger.Printf("Successfully reopened %s", tail.Filename)
2014-05-17 08:03:16 +08:00
tail.openReader()
return nil
} else {
tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
}
case <-tail.changes.Truncated:
// Always reopen truncated files (Follow is true)
tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
2013-05-30 07:32:01 +08:00
return err
}
tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
2014-05-17 08:03:16 +08:00
tail.openReader()
return nil
case <-tail.Dying():
return ErrStop
}
2013-05-30 07:32:01 +08:00
panic("unreachable")
}
2014-05-17 08:03:16 +08:00
func (tail *Tail) openReader() {
if tail.MaxLineSize > 0 {
2014-04-29 05:14:21 +08:00
// add 2 to account for newline characters
2014-05-17 08:03:16 +08:00
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
} else {
2014-05-17 08:03:16 +08:00
tail.reader = bufio.NewReader(tail.file)
}
}
2014-04-29 05:55:51 +08:00
func (tail *Tail) seekEnd() error {
return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
}
func (tail *Tail) seekTo(pos SeekInfo) error {
_, err := tail.file.Seek(pos.Offset, pos.Whence)
2014-04-29 05:55:51 +08:00
if err != nil {
return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
}
// Reset the read buffer whenever the file is re-seek'ed
tail.reader.Reset(tail.file)
return nil
}
// sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line string) bool {
now := time.Now()
lines := []string{line}
// Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = util.PartitionString(line, tail.MaxLineSize)
}
2013-05-30 07:32:01 +08:00
for _, line := range lines {
tail.Lines <- &Line{line, now, nil}
2014-04-30 07:39:57 +08:00
}
if tail.Config.RateLimiter != nil {
ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
if !ok {
tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
tail.Filename)
return false
}
}
return true
}
// Cleanup removes inotify watches added by the tail package. This function is
// meant to be invoked from a process's exit handler. Linux kernel may not
// automatically remove inotify watches after the process exits.
2015-07-03 22:53:42 +08:00
func (tail *Tail) Cleanup() {
watch.Cleanup(tail.Filename)
}