tail/tail.go

188 lines
3.9 KiB
Go
Raw Normal View History

package tail
import (
"bufio"
"fmt"
"github.com/howeyc/fsnotify"
"io"
"log"
"os"
"path/filepath"
"time"
)
type Line struct {
Text string
UnixTime int64
}
type Tail struct {
Filename string
Lines chan *Line
maxlinesize int
file *os.File
reader *bufio.Reader
watcher *fsnotify.Watcher
stop chan bool
created chan bool
}
// TailFile channels the lines of a logfile along with timestamp. If
// end is true, channel only newly added lines. If retry is true, tail
// the file name (not descriptor) and retry on file open/read errors.
func TailFile(filename string, maxlinesize int, end bool, retry bool) *Tail {
t := &Tail{
filename,
make(chan *Line),
maxlinesize,
nil,
nil,
fileCreateWatcher(filename),
make(chan bool),
make(chan bool)}
go t.tailFileSync(end, retry)
return t
}
func (tail *Tail) Stop() {
tail.stop <- true
tail.close()
}
func (tail *Tail) close() {
close(tail.Lines)
tail.watcher.Close()
if tail.file != nil {
tail.file.Close()
}
}
func (tail *Tail) reopen(wait bool) {
if tail.file != nil {
tail.file.Close()
}
for {
var err error
tail.file, err = os.Open(tail.Filename)
if err != nil {
if os.IsNotExist(err) && wait {
for {
evt := <-tail.watcher.Event
if evt.Name == tail.Filename {
break
}
}
continue
}
// TODO: don't panic here
panic(fmt.Sprintf("can't open file: %s", err))
}
return
}
return // unreachable
}
func (tail *Tail) readLine() ([]byte, error) {
line, isPrefix, err := tail.reader.ReadLine()
if isPrefix && err == nil {
// line is longer than what we can accept.
// ignore the rest of this line.
for {
_, isPrefix, err := tail.reader.ReadLine()
if !isPrefix || err != nil {
return line, err
}
}
}
return line, err
}
func (tail *Tail) tailFileSync(end bool, retry bool) {
tail.reopen(retry)
// Note: seeking to end happens only at the beginning; never
// during subsequent re-opens.
if end {
_, err := tail.file.Seek(0, 2) // seek to end of the file
if err != nil {
// TODO: don't panic here
panic(fmt.Sprintf("seek error: %s", err))
}
}
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
every2Seconds := time.Tick(2 * time.Second)
for {
line, err := tail.readLine()
if err != nil && err != io.EOF {
log.Println("Error reading file; skipping this file - ", err)
tail.close()
return
}
// sleep for 0.1s on inactive files, else we cause too much I/O activity
if err == io.EOF {
time.Sleep(100 * time.Millisecond)
}
if line != nil {
tail.Lines <- &Line{string(line), getCurrentTime()}
}
select {
case <-every2Seconds: // periodically stat the file to check for possibly deletion.
if _, err := tail.file.Stat(); os.IsNotExist(err) {
if retry {
log.Printf("File %s has gone away; attempting to reopen it.\n", tail.Filename)
tail.reopen(retry)
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
continue
} else {
log.Printf("File %s has gone away; skipping this file.\n", tail.Filename)
tail.close()
return
}
}
case evt := <-tail.watcher.Event:
if evt.Name == tail.Filename {
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename)
tail.reopen(retry)
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
continue
}
case <-tail.stop: // stop the tailer if requested
return
default:
}
}
}
// returns the watcher for file create events
func fileCreateWatcher(filename string) *fsnotify.Watcher {
watcher, err := fsnotify.NewWatcher()
if err != nil {
panic(fmt.Sprintf("fsnotify error: %s", err))
}
// watch on parent directory because the file may not exit.
err = watcher.WatchFlags(filepath.Dir(filename), fsnotify.FSN_CREATE)
if err != nil {
panic(fmt.Sprintf("fsnotify error on file: %s", err))
}
return watcher
}
// get current time in unix timestamp
func getCurrentTime() int64 {
return time.Now().UTC().Unix()
}