diff --git a/.travis.yml b/.travis.yml index 5407eba..57c00e2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,4 +11,4 @@ go: - 1.4.2 install: - - go get gopkg.in/fsnotify.v0 + - go get gopkg.in/fsnotify.v1 diff --git a/tail.go b/tail.go index e7bff9b..6e22d59 100644 --- a/tail.go +++ b/tail.go @@ -74,9 +74,8 @@ type Tail struct { Lines chan *Line Config - file *os.File - reader *bufio.Reader - tracker *watch.InotifyTracker + file *os.File + reader *bufio.Reader watcher watch.FileWatcher changes *watch.FileChanges @@ -114,12 +113,7 @@ func TailFile(filename string, config Config) (*Tail, error) { if t.Poll { t.watcher = watch.NewPollingFileWatcher(filename) } else { - t.tracker = watch.NewInotifyTracker() - w, err := t.tracker.NewWatcher() - if err != nil { - return nil, err - } - t.watcher = watch.NewInotifyFileWatcher(filename, w) + t.watcher = watch.NewInotifyFileWatcher(filename) } if t.MustExist { @@ -308,11 +302,11 @@ func (tail *Tail) tailFileSync() { // reopened if ReOpen is true. Truncated files are always reopened. func (tail *Tail) waitForChanges() error { if tail.changes == nil { - st, err := tail.file.Stat() + pos, err := tail.file.Seek(0, os.SEEK_CUR) if err != nil { return err } - tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, st) + tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, pos) } select { @@ -358,7 +352,7 @@ func (tail *Tail) openReader() { } func (tail *Tail) seekEnd() error { - return tail.seekTo(SeekInfo{Offset: 0, Whence: 2}) + return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END}) } func (tail *Tail) seekTo(pos SeekInfo) error { @@ -402,7 +396,5 @@ func (tail *Tail) sendLine(line string) bool { // meant to be invoked from a process's exit handler. Linux kernel may not // automatically remove inotify watches after the process exits. func (tail *Tail) Cleanup() { - if tail.tracker != nil { - tail.tracker.CloseAll() - } + watch.Cleanup(tail.Filename) } diff --git a/watch/inotify.go b/watch/inotify.go index ee660af..d6635f4 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -8,7 +8,8 @@ import ( "path/filepath" "github.com/hpcloud/tail/util" - "gopkg.in/fsnotify.v0" + + "gopkg.in/fsnotify.v1" "gopkg.in/tomb.v1" ) @@ -16,11 +17,10 @@ import ( type InotifyFileWatcher struct { Filename string Size int64 - w *fsnotify.Watcher } -func NewInotifyFileWatcher(filename string, w *fsnotify.Watcher) *InotifyFileWatcher { - fw := &InotifyFileWatcher{filename, 0, w} +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filename, 0} return fw } @@ -28,11 +28,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { dirname := filepath.Dir(fw.Filename) // Watch for new files to be created in the parent directory. - err := fw.w.WatchFlags(dirname, fsnotify.FSN_CREATE) + err := Watch(dirname) if err != nil { return err } - defer fw.w.RemoveWatch(dirname) + defer RemoveWatch(dirname) // Do a real check now as the file might have been created before // calling `WatchFlags` above. @@ -41,9 +41,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { return err } + events := Events(fw.Filename) + for { select { - case evt, ok := <-fw.w.Event: + case evt, ok := <-events: if !ok { return fmt.Errorf("inotify watcher has been closed") } else if evt.Name == fw.Filename { @@ -56,28 +58,30 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { panic("unreachable") } -func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { +func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges { changes := NewFileChanges() - err := fw.w.Watch(fw.Filename) + err := Watch(fw.Filename) if err != nil { - util.Fatal("Error watching %v: %v", fw.Filename, err) + go changes.NotifyDeleted() } - fw.Size = fi.Size() + fw.Size = pos go func() { - defer fw.w.RemoveWatch(fw.Filename) + defer RemoveWatch(fw.Filename) defer changes.Close() + events := Events(fw.Filename) + for { prevSize := fw.Size - var evt *fsnotify.FileEvent + var evt fsnotify.Event var ok bool select { - case evt, ok = <-fw.w.Event: + case evt, ok = <-events: if !ok { return } @@ -86,14 +90,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh } switch { - case evt.IsDelete(): + case evt.Op&fsnotify.Remove == fsnotify.Remove: fallthrough - case evt.IsRename(): + case evt.Op&fsnotify.Rename == fsnotify.Rename: changes.NotifyDeleted() return - case evt.IsModify(): + case evt.Op&fsnotify.Write == fsnotify.Write: fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index 26ec800..07e7d7a 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -3,49 +3,167 @@ package watch import ( - "gopkg.in/fsnotify.v0" "log" + "os" "sync" + "syscall" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v1" ) type InotifyTracker struct { - mux sync.Mutex - watchers map[*fsnotify.Watcher]bool + mux sync.Mutex + watcher *fsnotify.Watcher + chans map[string]chan fsnotify.Event + done map[string]chan bool + watch chan *watchInfo + remove chan string + error chan error } -func NewInotifyTracker() *InotifyTracker { - t := new(InotifyTracker) - t.watchers = make(map[*fsnotify.Watcher]bool) - return t +type watchInfo struct { + fname string } -func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) { - t.mux.Lock() - defer t.mux.Unlock() - w, err := fsnotify.NewWatcher() - if err == nil { - t.watchers[w] = true - } - return w, err -} +var ( + // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used + shared *InotifyTracker -func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) { - t.mux.Lock() - defer t.mux.Unlock() - if _, ok := t.watchers[w]; ok { - err = w.Close() - delete(t.watchers, w) - } - return -} - -func (t *InotifyTracker) CloseAll() { - t.mux.Lock() - defer t.mux.Unlock() - for w, _ := range t.watchers { - if err := w.Close(); err != nil { - log.Printf("Error closing watcher: %v", err) + // these are used to ensure the shared InotifyTracker is run exactly once + once = sync.Once{} + goRun = func() { + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan fsnotify.Event), + done: make(map[string]chan bool), + watch: make(chan *watchInfo), + remove: make(chan string), + error: make(chan error), + } + go shared.run() + } + + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Watch signals the run goroutine to begin watching the input filename +func Watch(fname string) error { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + shared.watch <- &watchInfo{ + fname: fname, + } + return <-shared.error +} + +// RemoveWatch signals the run goroutine to remove the watch for the input filename +func RemoveWatch(fname string) { + // start running the shared InotifyTracker if not already running + once.Do(goRun) + + shared.mux.Lock() + done := shared.done[fname] + if done != nil { + delete(shared.done, fname) + close(done) + } + shared.mux.Unlock() + + shared.remove <- fname +} + +// Events returns a channel to which FileEvents corresponding to the input filename +// will be sent. This channel will be closed when removeWatch is called on this +// filename. +func Events(fname string) chan fsnotify.Event { + shared.mux.Lock() + defer shared.mux.Unlock() + + return shared.chans[fname] +} + +// Cleanup removes the watch for the input filename if necessary. +func Cleanup(fname string) { + RemoveWatch(fname) +} + +// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) addWatch(fname string) error { + shared.mux.Lock() + defer shared.mux.Unlock() + + if shared.chans[fname] == nil { + shared.chans[fname] = make(chan fsnotify.Event) + shared.done[fname] = make(chan bool) + } + return shared.watcher.Add(fname) +} + +// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) removeWatch(fname string) { + shared.mux.Lock() + defer shared.mux.Unlock() + + if ch := shared.chans[fname]; ch != nil { + shared.watcher.Remove(fname) + + delete(shared.chans, fname) + close(ch) + } +} + +// sendEvent sends the input event to the appropriate Tail. +func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { + shared.mux.Lock() + ch := shared.chans[event.Name] + done := shared.done[event.Name] + shared.mux.Unlock() + + if ch != nil && done != nil { + select { + case ch <- event: + case <-done: + } + } +} + +// run starts the goroutine in which the shared struct reads events from its +// Watcher's Event channel and sends the events to the appropriate Tail. +func (shared *InotifyTracker) run() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("failed to create Watcher") + } + shared.watcher = watcher + + for { + select { + case winfo := <-shared.watch: + shared.error <- shared.addWatch(winfo.fname) + + case fname := <-shared.remove: + shared.removeWatch(fname) + + case event, open := <-shared.watcher.Events: + if !open { + return + } + shared.sendEvent(event) + + case err, open := <-shared.watcher.Errors: + if !open { + return + } else if err != nil { + sysErr, ok := err.(*os.SyscallError) + if !ok || sysErr.Err != syscall.EINTR { + logger.Printf("Error in Watcher Error channel: %s", err) + } + } } - delete(t.watchers, w) } } diff --git a/watch/polling.go b/watch/polling.go index e13e034..5479272 100644 --- a/watch/polling.go +++ b/watch/polling.go @@ -40,14 +40,19 @@ func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error { panic("unreachable") } -func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, origFi os.FileInfo) *FileChanges { +func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges { changes := NewFileChanges() var prevModTime time.Time // XXX: use tomb.Tomb to cleanly manage these goroutines. replace // the fatal (below) with tomb's Kill. - fw.Size = origFi.Size() + fw.Size = pos + origFi, err := os.Stat(fw.Filename) + if err != nil { + changes.NotifyDeleted() + return changes + } go func() { defer changes.Close() diff --git a/watch/watch.go b/watch/watch.go index a3c06e8..fbc08a5 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -2,10 +2,7 @@ package watch -import ( - "gopkg.in/tomb.v1" - "os" -) +import "gopkg.in/tomb.v1" // FileWatcher monitors file-level events. type FileWatcher interface { @@ -16,5 +13,7 @@ type FileWatcher interface { // deletion, renames or truncations. Returned FileChanges group of // channels will be closed, thus become unusable, after a deletion // or truncation event. - ChangeEvents(*tomb.Tomb, os.FileInfo) *FileChanges + // In order to properly report truncations, ChangeEvents requires + // the caller to pass their current offset in the file. + ChangeEvents(*tomb.Tomb, int64) *FileChanges }