From 99fe83b7424fffbcb1de5adb64abfd3c64324ea2 Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Tue, 9 Oct 2012 15:13:05 -0700 Subject: [PATCH] make tail.go its own package, independent of logyard --- tail.go | 187 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 tail.go diff --git a/tail.go b/tail.go new file mode 100644 index 0000000..057d96b --- /dev/null +++ b/tail.go @@ -0,0 +1,187 @@ +package tail + +import ( + "bufio" + "fmt" + "github.com/howeyc/fsnotify" + "io" + "log" + "os" + "path/filepath" + "time" +) + +type Line struct { + Text string + UnixTime int64 +} + +type Tail struct { + Filename string + Lines chan *Line + + maxlinesize int + file *os.File + reader *bufio.Reader + watcher *fsnotify.Watcher + + stop chan bool + created chan bool +} + +// TailFile channels the lines of a logfile along with timestamp. If +// end is true, channel only newly added lines. If retry is true, tail +// the file name (not descriptor) and retry on file open/read errors. +func TailFile(filename string, maxlinesize int, end bool, retry bool) *Tail { + t := &Tail{ + filename, + make(chan *Line), + maxlinesize, + nil, + nil, + fileCreateWatcher(filename), + make(chan bool), + make(chan bool)} + + go t.tailFileSync(end, retry) + + return t +} + +func (tail *Tail) Stop() { + tail.stop <- true + tail.close() +} + +func (tail *Tail) close() { + close(tail.Lines) + tail.watcher.Close() + if tail.file != nil { + tail.file.Close() + } +} + +func (tail *Tail) reopen(wait bool) { + if tail.file != nil { + tail.file.Close() + } + for { + var err error + tail.file, err = os.Open(tail.Filename) + if err != nil { + if os.IsNotExist(err) && wait { + for { + evt := <-tail.watcher.Event + if evt.Name == tail.Filename { + break + } + } + continue + } + // TODO: don't panic here + panic(fmt.Sprintf("can't open file: %s", err)) + } + return + } + return // unreachable +} + +func (tail *Tail) readLine() ([]byte, error) { + line, isPrefix, err := tail.reader.ReadLine() + + if isPrefix && err == nil { + // line is longer than what we can accept. + // ignore the rest of this line. + for { + _, isPrefix, err := tail.reader.ReadLine() + if !isPrefix || err != nil { + return line, err + } + } + } + return line, err +} + +func (tail *Tail) tailFileSync(end bool, retry bool) { + tail.reopen(retry) + + // Note: seeking to end happens only at the beginning; never + // during subsequent re-opens. + if end { + _, err := tail.file.Seek(0, 2) // seek to end of the file + if err != nil { + // TODO: don't panic here + panic(fmt.Sprintf("seek error: %s", err)) + } + } + + tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) + + every2Seconds := time.Tick(2 * time.Second) + + for { + line, err := tail.readLine() + + if err != nil && err != io.EOF { + log.Println("Error reading file; skipping this file - ", err) + tail.close() + return + } + + // sleep for 0.1s on inactive files, else we cause too much I/O activity + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + } + + if line != nil { + tail.Lines <- &Line{string(line), getCurrentTime()} + } + + select { + case <-every2Seconds: // periodically stat the file to check for possibly deletion. + if _, err := tail.file.Stat(); os.IsNotExist(err) { + if retry { + log.Printf("File %s has gone away; attempting to reopen it.\n", tail.Filename) + tail.reopen(retry) + tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) + continue + } else { + log.Printf("File %s has gone away; skipping this file.\n", tail.Filename) + tail.close() + return + } + } + case evt := <-tail.watcher.Event: + if evt.Name == tail.Filename { + log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename) + tail.reopen(retry) + tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) + continue + } + case <-tail.stop: // stop the tailer if requested + return + default: + } + } +} + +// returns the watcher for file create events +func fileCreateWatcher(filename string) *fsnotify.Watcher { + watcher, err := fsnotify.NewWatcher() + if err != nil { + panic(fmt.Sprintf("fsnotify error: %s", err)) + } + + // watch on parent directory because the file may not exit. + err = watcher.WatchFlags(filepath.Dir(filename), fsnotify.FSN_CREATE) + if err != nil { + panic(fmt.Sprintf("fsnotify error on file: %s", err)) + } + + return watcher +} + +// get current time in unix timestamp +func getCurrentTime() int64 { + return time.Now().UTC().Unix() +}