diff --git a/tail.go b/tail.go index 519a051..c698c15 100644 --- a/tail.go +++ b/tail.go @@ -3,11 +3,9 @@ package tail import ( "bufio" "fmt" - "github.com/howeyc/fsnotify" "io" "log" "os" - "path/filepath" "time" ) @@ -20,10 +18,11 @@ type Tail struct { Filename string Lines chan *Line + useinotify bool maxlinesize int file *os.File reader *bufio.Reader - watcher *fsnotify.Watcher + watcher FileWatcher stop chan bool created chan bool @@ -32,21 +31,25 @@ type Tail struct { // TailFile channels the lines of a logfile along with timestamp. If // end is true, channel only newly added lines. If retry is true, tail // the file name (not descriptor) and retry on file open/read errors. -func TailFile(filename string, maxlinesize int, end bool, retry bool) (*Tail, error) { - watcher, err := fileCreateWatcher(filename) - if err != nil { - return nil, err - } +func TailFile(filename string, maxlinesize int, end bool, retry bool, useinotify bool) (*Tail, error) { t := &Tail{ filename, make(chan *Line), + useinotify, maxlinesize, nil, nil, - watcher, + nil, make(chan bool), make(chan bool)} + if !useinotify { + log.Println("Warning: not using inotify; will poll ", filename) + t.watcher = NewPollingFileWatcher(filename) + } else { + t.watcher = NewInotifyFileWatcher(filename) + } + go t.tailFileSync(end, retry) return t, nil @@ -59,7 +62,6 @@ func (tail *Tail) Stop() { func (tail *Tail) close() { close(tail.Lines) - tail.watcher.Close() if tail.file != nil { tail.file.Close() } @@ -74,16 +76,15 @@ func (tail *Tail) reopen(wait bool) { tail.file, err = os.Open(tail.Filename) if err != nil { if os.IsNotExist(err) && wait { - for { - evt := <-tail.watcher.Event - if evt.Name == tail.Filename { - break - } + log.Println("blocking until exists") + err := tail.watcher.BlockUntilExists() + if err != nil { + panic(err) } + log.Println("exists now") continue } - // TODO: don't panic here - panic(fmt.Sprintf("can't open file: %s", err)) + log.Println(fmt.Sprintf("Unable to reopen file (%s): %s", tail.Filename, err)) } return } @@ -109,6 +110,8 @@ func (tail *Tail) readLine() ([]byte, error) { func (tail *Tail) tailFileSync(end bool, retry bool) { tail.reopen(retry) + var changes chan bool + // Note: seeking to end happens only at the beginning; never // during subsequent re-opens. if end { @@ -121,70 +124,60 @@ func (tail *Tail) tailFileSync(end bool, retry bool) { tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) - every2Seconds := time.Tick(2 * time.Second) - for { line, err := tail.readLine() - if err != nil && err != io.EOF { - log.Println("Error reading file; skipping this file - ", err) - tail.close() - return - } + if err == nil { + if line != nil { + tail.Lines <- &Line{string(line), getCurrentTime()} + } + } else { + if err != io.EOF { + log.Println("Error reading file; skipping this file - ", err) + tail.close() + return + } - // sleep for 0.1s on inactive files, else we cause too much I/O activity - if err == io.EOF { - time.Sleep(100 * time.Millisecond) - } + // When end of file is reached, wait for more data to + // become available. Wait strategy is based on the + // `tail.watcher` implementation (inotify or polling). + if err == io.EOF { + if changes == nil { + changes = tail.watcher.ChangeEvents() + } - if line != nil { - tail.Lines <- &Line{string(line), getCurrentTime()} - } + //log.Println("WAITING ", tail.Filename) + _, ok := <-changes + //log.Println("RECEIVED ", tail.Filename) - select { - case <-every2Seconds: // periodically stat the file to check for possibly deletion. - if _, err := tail.file.Stat(); os.IsNotExist(err) { - if retry { - log.Printf("File %s has gone away; attempting to reopen it.\n", tail.Filename) - tail.reopen(retry) - tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) - continue - } else { - log.Printf("File %s has gone away; skipping this file.\n", tail.Filename) - tail.close() - return + if !ok { + // file got deleted/renamed + if retry { + log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename) + tail.reopen(retry) + log.Printf("File %s has been reopened.", tail.Filename) + tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) + changes = nil + continue + } else { + log.Printf("File %s has gone away; skipping this file.\n", tail.Filename) + tail.close() + return + } } } - case evt := <-tail.watcher.Event: - if evt.Name == tail.Filename { - log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename) - tail.reopen(retry) - tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize) - continue - } - case <-tail.stop: // stop the tailer if requested + } + + // stop the tailer if requested. + // FIXME: won't happen promptly; http://bugs.activestate.com/show_bug.cgi?id=95718#c3 + select { + case <-tail.stop: return default: } } } -// returns the watcher for file create events -func fileCreateWatcher(filename string) (*fsnotify.Watcher, error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err - } - - // watch on parent directory because the file may not exit. - err = watcher.WatchFlags(filepath.Dir(filename), fsnotify.FSN_CREATE) - if err != nil { - return nil, err - } - - return watcher, nil -} - // get current time in unix timestamp func getCurrentTime() int64 { return time.Now().UTC().Unix() diff --git a/watch.go b/watch.go new file mode 100644 index 0000000..24ec164 --- /dev/null +++ b/watch.go @@ -0,0 +1,131 @@ +// TODO: avoid creating two instances of the fsnotify.Watcher struct +package tail + +import ( + "github.com/howeyc/fsnotify" + "os" + "path/filepath" + "time" +) + +type FileWatcher interface { + BlockUntilExists() error + ChangeEvents() chan bool +} + +// FileWatcher monitors file-level events +type InotifyFileWatcher struct { + Filename string +} + +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filename} + return fw +} + +// BlockUntilExists blocks until the file comes into existence. If the +// file already exists, then block until it is created again. +func (fw *InotifyFileWatcher) BlockUntilExists() error { + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE) + if err != nil { + return err + } + <-w.Event + w.RemoveWatch(filepath.Dir(fw.Filename)) + // XXX: how to free up w's goroutines without relying on the gc? + return nil +} + +// ChangeEvents returns a channel that gets updated when the file is ready to be read. +func (fw *InotifyFileWatcher) ChangeEvents() chan bool { + w, err := fsnotify.NewWatcher() + if err != nil { + panic(err) + } + err = w.Watch(fw.Filename) + if err != nil { + panic(err) + } + + ch := make(chan bool) + + go func() { + for { + evt := <-w.Event + switch { + case evt.IsDelete(): + fallthrough + + case evt.IsRename(): + close(ch) + w.RemoveWatch(fw.Filename) + return + + case evt.IsModify(): + // send only if channel is empty. + select { + case ch <- true: + default: + } + } + } + }() + + return ch +} + +// FileWatcher monitors file-level events +type PollingFileWatcher struct { + Filename string +} + +func NewPollingFileWatcher(filename string) *PollingFileWatcher { + fw := &PollingFileWatcher{filename} + return fw +} + +// BlockUntilExists blocks until the file comes into existence. If the +// file already exists, then block until it is created again. +func (fw *PollingFileWatcher) BlockUntilExists() error { + panic("not implemented") + return nil +} + +// ChangeEvents returns a channel that gets updated when the file is ready to be read. +func (fw *PollingFileWatcher) ChangeEvents() chan bool { + ch := make(chan bool) + stop := make(chan bool) + every2Seconds := time.Tick(2 * time.Second) + + go func() { + for { + time.Sleep(250 * time.Millisecond) + select { + case ch <- true: + case <-stop: + return + default: + } + } + }() + + go func() { + for { + select { + case <-every2Seconds: + // NOTE: not using file descriptor. + if _, err := os.Stat(fw.Filename); os.IsNotExist(err) { + stop <- true + close(ch) + return + } + } + } + }() + + return ch +}