From b155fc13d44fe6ee44a589a3ed4a191e00afa574 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Thu, 30 Jul 2015 12:06:40 -0700 Subject: [PATCH 1/8] Single shared Watcher used to avoid inotify limit --- tail.go | 16 ++--- watch/inotify.go | 24 ++++--- watch/inotify_tracker.go | 136 ++++++++++++++++++++++++++++++--------- 3 files changed, 122 insertions(+), 54 deletions(-) diff --git a/tail.go b/tail.go index a7a1a5b..753158d 100644 --- a/tail.go +++ b/tail.go @@ -62,9 +62,8 @@ type Tail struct { Lines chan *Line Config - file *os.File - reader *bufio.Reader - tracker *watch.InotifyTracker + file *os.File + reader *bufio.Reader watcher watch.FileWatcher changes *watch.FileChanges @@ -102,12 +101,7 @@ func TailFile(filename string, config Config) (*Tail, error) { if t.Poll { t.watcher = watch.NewPollingFileWatcher(filename) } else { - t.tracker = watch.NewInotifyTracker() - w, err := t.tracker.NewWatcher() - if err != nil { - return nil, err - } - t.watcher = watch.NewInotifyFileWatcher(filename, w) + t.watcher = watch.NewInotifyFileWatcher(filename) } if t.MustExist { @@ -390,7 +384,5 @@ func (tail *Tail) sendLine(line string) bool { // meant to be invoked from a process's exit handler. Linux kernel may not // automatically remove inotify watches after the process exits. func (tail *Tail) Cleanup() { - if tail.tracker != nil { - tail.tracker.CloseAll() - } + watch.Cleanup(tail.Filename) } diff --git a/watch/inotify.go b/watch/inotify.go index ee660af..0c95f7f 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -8,6 +8,7 @@ import ( "path/filepath" "github.com/hpcloud/tail/util" + "gopkg.in/fsnotify.v0" "gopkg.in/tomb.v1" ) @@ -16,11 +17,10 @@ import ( type InotifyFileWatcher struct { Filename string Size int64 - w *fsnotify.Watcher } -func NewInotifyFileWatcher(filename string, w *fsnotify.Watcher) *InotifyFileWatcher { - fw := &InotifyFileWatcher{filename, 0, w} +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filename, 0} return fw } @@ -28,11 +28,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { dirname := filepath.Dir(fw.Filename) // Watch for new files to be created in the parent directory. - err := fw.w.WatchFlags(dirname, fsnotify.FSN_CREATE) + err := shared.WatchFlags(dirname, fsnotify.FSN_CREATE) if err != nil { return err } - defer fw.w.RemoveWatch(dirname) + defer shared.RemoveWatch(dirname) // Do a real check now as the file might have been created before // calling `WatchFlags` above. @@ -41,9 +41,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { return err } + events := shared.Events(fw.Filename) + for { select { - case evt, ok := <-fw.w.Event: + case evt, ok := <-events: if !ok { return fmt.Errorf("inotify watcher has been closed") } else if evt.Name == fw.Filename { @@ -59,17 +61,19 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { changes := NewFileChanges() - err := fw.w.Watch(fw.Filename) + err := shared.Watch(fw.Filename) if err != nil { - util.Fatal("Error watching %v: %v", fw.Filename, err) + go changes.NotifyDeleted() } fw.Size = fi.Size() go func() { - defer fw.w.RemoveWatch(fw.Filename) + defer shared.RemoveWatch(fw.Filename) defer changes.Close() + events := shared.Events(fw.Filename) + for { prevSize := fw.Size @@ -77,7 +81,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh var ok bool select { - case evt, ok = <-fw.w.Event: + case evt, ok = <-events: if !ok { return } diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index 26ec800..b00b4fc 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -3,49 +3,121 @@ package watch import ( - "gopkg.in/fsnotify.v0" "log" + "os" "sync" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v0" ) type InotifyTracker struct { - mux sync.Mutex - watchers map[*fsnotify.Watcher]bool + mux sync.Mutex + watcher *fsnotify.Watcher + chans map[string]chan *fsnotify.FileEvent + done chan struct{} } -func NewInotifyTracker() *InotifyTracker { - t := new(InotifyTracker) - t.watchers = make(map[*fsnotify.Watcher]bool) - return t -} - -func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) { - t.mux.Lock() - defer t.mux.Unlock() - w, err := fsnotify.NewWatcher() - if err == nil { - t.watchers[w] = true +var ( + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan *fsnotify.FileEvent), } - return w, err + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Watch calls fsnotify.Watch for the input filename, creating a new Watcher if the +// previous Watcher was closed. +func (shared *InotifyTracker) Watch(filename string) error { + return shared.WatchFlags(filename, fsnotify.FSN_ALL) } -func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) { - t.mux.Lock() - defer t.mux.Unlock() - if _, ok := t.watchers[w]; ok { - err = w.Close() - delete(t.watchers, w) - } - return -} +// WatchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) WatchFlags(filename string, flags uint32) error { + shared.mux.Lock() + defer shared.mux.Unlock() -func (t *InotifyTracker) CloseAll() { - t.mux.Lock() - defer t.mux.Unlock() - for w, _ := range t.watchers { - if err := w.Close(); err != nil { - log.Printf("Error closing watcher: %v", err) + // Start up shared struct if necessary + if len(shared.chans) == 0 { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("Error creating Watcher") + } + shared.watcher = watcher + shared.done = make(chan struct{}) + go shared.run() + } + + // Create a channel to which FileEvents for the input filename will be sent + ch := shared.chans[filename] + if ch == nil { + shared.chans[filename] = make(chan *fsnotify.FileEvent) + } + return shared.watcher.WatchFlags(filename, flags) +} + +// RemoveWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) RemoveWatch(filename string) { + shared.mux.Lock() + defer shared.mux.Unlock() + + _, found := shared.chans[filename] + if !found { + return + } + + shared.watcher.RemoveWatch(filename) + delete(shared.chans, filename) + + // If this is the last target to be removed, close the shared Watcher + if len(shared.chans) == 0 { + shared.watcher.Close() + close(shared.done) + } +} + +// Events returns a channel to which FileEvents corresponding to the input filename +// will be sent. This channel will be closed when removeWatch is called on this +// filename. +func (shared *InotifyTracker) Events(filename string) <-chan *fsnotify.FileEvent { + shared.mux.Lock() + defer shared.mux.Unlock() + + return shared.chans[filename] +} + +// Cleanup removes the watch for the input filename and closes the shared Watcher +// if there are no more targets. +func Cleanup(filename string) { + shared.RemoveWatch(filename) +} + +// run starts the goroutine in which the shared struct reads events from its +// Watcher's Event channel and sends the events to the appropriate Tail. +func (shared *InotifyTracker) run() { + for { + select { + case event, open := <-shared.watcher.Event: + if !open { + return + } + // send the FileEvent to the appropriate Tail's channel + ch := shared.chans[event.Name] + if ch != nil { + ch <- event + } + + case err, open := <-shared.watcher.Error: + if !open { + return + } + logger.Printf("Error in Watcher Errors channel: %s", err) + + case <-shared.done: + return } - delete(t.watchers, w) } } From 0b9f044bb364bc4932c81f3d54800872fbabd0b8 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Mon, 3 Aug 2015 14:38:42 -0700 Subject: [PATCH 2/8] Ignoring EINTR (ctrl+Z) signal --- watch/inotify_tracker.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index b00b4fc..b937621 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -6,6 +6,7 @@ import ( "log" "os" "sync" + "syscall" "github.com/hpcloud/tail/util" @@ -113,8 +114,12 @@ func (shared *InotifyTracker) run() { case err, open := <-shared.watcher.Error: if !open { return + } else if err != nil { + sysErr, ok := err.(*os.SyscallError) + if !ok || sysErr.Err != syscall.EINTR { + logger.Printf("Error in Watcher Error channel: %s", err) + } } - logger.Printf("Error in Watcher Errors channel: %s", err) case <-shared.done: return From f053e2cd0c63c413f71a0d179afba1c6e4baabe5 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Tue, 4 Aug 2015 17:42:45 -0700 Subject: [PATCH 3/8] Fixed race conditions in watch/inotify_tracker.go --- watch/inotify_tracker.go | 151 ++++++++++++++++++++++++--------------- 1 file changed, 93 insertions(+), 58 deletions(-) diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index b937621..7f9d5f8 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -17,99 +17,137 @@ type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher chans map[string]chan *fsnotify.FileEvent - done chan struct{} + watch chan *watchInfo + remove chan string + error chan error +} + +type watchInfo struct { + fname string + flags uint32 } var ( + // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used shared = &InotifyTracker{ - mux: sync.Mutex{}, - chans: make(map[string]chan *fsnotify.FileEvent), + mux: sync.Mutex{}, + chans: make(map[string]chan *fsnotify.FileEvent), + watch: make(chan *watchInfo), + remove: make(chan string), + error: make(chan error), } - logger = log.New(os.Stderr, "", log.LstdFlags) -) -// Watch calls fsnotify.Watch for the input filename, creating a new Watcher if the -// previous Watcher was closed. -func (shared *InotifyTracker) Watch(filename string) error { - return shared.WatchFlags(filename, fsnotify.FSN_ALL) -} - -// WatchFlags calls fsnotify.WatchFlags for the input filename and flags, creating -// a new Watcher if the previous Watcher was closed. -func (shared *InotifyTracker) WatchFlags(filename string, flags uint32) error { - shared.mux.Lock() - defer shared.mux.Unlock() - - // Start up shared struct if necessary - if len(shared.chans) == 0 { - watcher, err := fsnotify.NewWatcher() - if err != nil { - util.Fatal("Error creating Watcher") - } - shared.watcher = watcher - shared.done = make(chan struct{}) + // these are used to ensure the shared InotifyTracker is run exactly once + once = &sync.Once{} + goRun = func() { go shared.run() } - // Create a channel to which FileEvents for the input filename will be sent - ch := shared.chans[filename] - if ch == nil { - shared.chans[filename] = make(chan *fsnotify.FileEvent) - } - return shared.watcher.WatchFlags(filename, flags) + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// WatchFlags signals the run goroutine to begin watching the input filename using +// using all flags. +func (shared *InotifyTracker) Watch(fname string) error { + return shared.WatchFlags(fname, fsnotify.FSN_ALL) } -// RemoveWatch calls fsnotify.RemoveWatch for the input filename and closes the -// corresponding events channel. -func (shared *InotifyTracker) RemoveWatch(filename string) { - shared.mux.Lock() - defer shared.mux.Unlock() +// WatchFlags signals the run goroutine to begin watching the input filename using +// using the input flags. +func (shared *InotifyTracker) WatchFlags(fname string, flags uint32) error { + // start running the shared InotifyTracker if not already running + once.Do(goRun) - _, found := shared.chans[filename] - if !found { - return + shared.watch <- &watchInfo{ + fname: fname, + flags: flags, } + return <-shared.error +} - shared.watcher.RemoveWatch(filename) - delete(shared.chans, filename) +// RemoveWatch signals the run goroutine to remove the watch for the input filename +func (shared *InotifyTracker) RemoveWatch(fname string) { + // start running the shared InotifyTracker if not already running + once.Do(goRun) - // If this is the last target to be removed, close the shared Watcher - if len(shared.chans) == 0 { - shared.watcher.Close() - close(shared.done) - } + shared.remove <- fname } // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. -func (shared *InotifyTracker) Events(filename string) <-chan *fsnotify.FileEvent { +func (shared *InotifyTracker) Events(fname string) chan *fsnotify.FileEvent { shared.mux.Lock() defer shared.mux.Unlock() - return shared.chans[filename] + return shared.chans[fname] } // Cleanup removes the watch for the input filename and closes the shared Watcher // if there are no more targets. -func Cleanup(filename string) { - shared.RemoveWatch(filename) +func Cleanup(fname string) { + shared.RemoveWatch(fname) +} + +// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) watchFlags(fname string, flags uint32) error { + shared.mux.Lock() + defer shared.mux.Unlock() + + if shared.chans[fname] == nil { + shared.chans[fname] = make(chan *fsnotify.FileEvent) + } + return shared.watcher.WatchFlags(fname, flags) +} + +// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) removeWatch(fname string) { + shared.mux.Lock() + defer shared.mux.Unlock() + + if ch := shared.chans[fname]; ch != nil { + shared.watcher.RemoveWatch(fname) + + delete(shared.chans, fname) + close(ch) + } +} + +// sendEvent sends the input event to the appropriate Tail. +func (shared *InotifyTracker) sendEvent(event *fsnotify.FileEvent) { + ch := shared.Events(event.Name) + if ch != nil { + select { + case ch <- event: + default: + } + } } // run starts the goroutine in which the shared struct reads events from its // Watcher's Event channel and sends the events to the appropriate Tail. func (shared *InotifyTracker) run() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("failed to create Watcher") + } + shared.watcher = watcher + for { select { + case winfo := <-shared.watch: + shared.error <- shared.watchFlags(winfo.fname, winfo.flags) + + case fname := <-shared.remove: + shared.removeWatch(fname) + case event, open := <-shared.watcher.Event: if !open { return } - // send the FileEvent to the appropriate Tail's channel - ch := shared.chans[event.Name] - if ch != nil { - ch <- event - } + shared.sendEvent(event) case err, open := <-shared.watcher.Error: if !open { @@ -120,9 +158,6 @@ func (shared *InotifyTracker) run() { logger.Printf("Error in Watcher Error channel: %s", err) } } - - case <-shared.done: - return } } } From a5dc0d39baed14b98a2167dd5b1332c9f3127892 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Wed, 5 Aug 2015 14:03:38 -0700 Subject: [PATCH 4/8] Ensure InotifyTracker attempts to send all FileEvents --- watch/inotify_tracker.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index 7f9d5f8..f740ae1 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -17,6 +17,7 @@ type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher chans map[string]chan *fsnotify.FileEvent + done map[string]chan bool watch chan *watchInfo remove chan string error chan error @@ -32,6 +33,7 @@ var ( shared = &InotifyTracker{ mux: sync.Mutex{}, chans: make(map[string]chan *fsnotify.FileEvent), + done: make(map[string]chan bool), watch: make(chan *watchInfo), remove: make(chan string), error: make(chan error), @@ -70,6 +72,14 @@ func (shared *InotifyTracker) RemoveWatch(fname string) { // start running the shared InotifyTracker if not already running once.Do(goRun) + shared.mux.Lock() + done := shared.done[fname] + if done != nil { + delete(shared.done, fname) + close(done) + } + shared.mux.Unlock() + shared.remove <- fname } @@ -97,6 +107,7 @@ func (shared *InotifyTracker) watchFlags(fname string, flags uint32) error { if shared.chans[fname] == nil { shared.chans[fname] = make(chan *fsnotify.FileEvent) + shared.done[fname] = make(chan bool) } return shared.watcher.WatchFlags(fname, flags) } @@ -117,11 +128,15 @@ func (shared *InotifyTracker) removeWatch(fname string) { // sendEvent sends the input event to the appropriate Tail. func (shared *InotifyTracker) sendEvent(event *fsnotify.FileEvent) { - ch := shared.Events(event.Name) - if ch != nil { + shared.mux.Lock() + ch := shared.chans[event.Name] + done := shared.done[event.Name] + shared.mux.Unlock() + + if ch != nil && done != nil { select { case ch <- event: - default: + case <-done: } } } From d46611791d0b85049f59724cf5eb308d7c8d463b Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Wed, 5 Aug 2015 15:48:10 -0700 Subject: [PATCH 5/8] Moved InotifyTracker initialization to constructor function --- watch/inotify.go | 12 ++++++------ watch/inotify_tracker.go | 34 +++++++++++++++++----------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/watch/inotify.go b/watch/inotify.go index 0c95f7f..aecf172 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -28,11 +28,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { dirname := filepath.Dir(fw.Filename) // Watch for new files to be created in the parent directory. - err := shared.WatchFlags(dirname, fsnotify.FSN_CREATE) + err := WatchFlags(dirname, fsnotify.FSN_CREATE) if err != nil { return err } - defer shared.RemoveWatch(dirname) + defer RemoveWatch(dirname) // Do a real check now as the file might have been created before // calling `WatchFlags` above. @@ -41,7 +41,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { return err } - events := shared.Events(fw.Filename) + events := Events(fw.Filename) for { select { @@ -61,7 +61,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { changes := NewFileChanges() - err := shared.Watch(fw.Filename) + err := Watch(fw.Filename) if err != nil { go changes.NotifyDeleted() } @@ -69,10 +69,10 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh fw.Size = fi.Size() go func() { - defer shared.RemoveWatch(fw.Filename) + defer RemoveWatch(fw.Filename) defer changes.Close() - events := shared.Events(fw.Filename) + events := Events(fw.Filename) for { prevSize := fw.Size diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index f740ae1..ffb8267 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -30,18 +30,19 @@ type watchInfo struct { var ( // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used - shared = &InotifyTracker{ - mux: sync.Mutex{}, - chans: make(map[string]chan *fsnotify.FileEvent), - done: make(map[string]chan bool), - watch: make(chan *watchInfo), - remove: make(chan string), - error: make(chan error), - } + shared *InotifyTracker // these are used to ensure the shared InotifyTracker is run exactly once - once = &sync.Once{} + once = sync.Once{} goRun = func() { + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan *fsnotify.FileEvent), + done: make(map[string]chan bool), + watch: make(chan *watchInfo), + remove: make(chan string), + error: make(chan error), + } go shared.run() } @@ -50,13 +51,13 @@ var ( // WatchFlags signals the run goroutine to begin watching the input filename using // using all flags. -func (shared *InotifyTracker) Watch(fname string) error { - return shared.WatchFlags(fname, fsnotify.FSN_ALL) +func Watch(fname string) error { + return WatchFlags(fname, fsnotify.FSN_ALL) } // WatchFlags signals the run goroutine to begin watching the input filename using // using the input flags. -func (shared *InotifyTracker) WatchFlags(fname string, flags uint32) error { +func WatchFlags(fname string, flags uint32) error { // start running the shared InotifyTracker if not already running once.Do(goRun) @@ -68,7 +69,7 @@ func (shared *InotifyTracker) WatchFlags(fname string, flags uint32) error { } // RemoveWatch signals the run goroutine to remove the watch for the input filename -func (shared *InotifyTracker) RemoveWatch(fname string) { +func RemoveWatch(fname string) { // start running the shared InotifyTracker if not already running once.Do(goRun) @@ -86,17 +87,16 @@ func (shared *InotifyTracker) RemoveWatch(fname string) { // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. -func (shared *InotifyTracker) Events(fname string) chan *fsnotify.FileEvent { +func Events(fname string) chan *fsnotify.FileEvent { shared.mux.Lock() defer shared.mux.Unlock() return shared.chans[fname] } -// Cleanup removes the watch for the input filename and closes the shared Watcher -// if there are no more targets. +// Cleanup removes the watch for the input filename if necessary. func Cleanup(fname string) { - shared.RemoveWatch(fname) + RemoveWatch(fname) } // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating From 8b4773e24eaaf3c98cb0e516a9d2ec6aaff9e193 Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Thu, 10 Sep 2015 16:47:13 -0700 Subject: [PATCH 6/8] Upgrade to fsnotify.v1 --- .travis.yml | 2 +- watch/inotify.go | 12 ++++++------ watch/inotify_tracker.go | 37 ++++++++++++++----------------------- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5407eba..57c00e2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,4 +11,4 @@ go: - 1.4.2 install: - - go get gopkg.in/fsnotify.v0 + - go get gopkg.in/fsnotify.v1 diff --git a/watch/inotify.go b/watch/inotify.go index aecf172..0676ac3 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -9,7 +9,7 @@ import ( "github.com/hpcloud/tail/util" - "gopkg.in/fsnotify.v0" + "gopkg.in/fsnotify.v1" "gopkg.in/tomb.v1" ) @@ -28,7 +28,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { dirname := filepath.Dir(fw.Filename) // Watch for new files to be created in the parent directory. - err := WatchFlags(dirname, fsnotify.FSN_CREATE) + err := Watch(dirname) if err != nil { return err } @@ -77,7 +77,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh for { prevSize := fw.Size - var evt *fsnotify.FileEvent + var evt *fsnotify.Event var ok bool select { @@ -90,14 +90,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh } switch { - case evt.IsDelete(): + case evt.Op&fsnotify.Remove == fsnotify.Remove: fallthrough - case evt.IsRename(): + case evt.Op&fsnotify.Rename == fsnotify.Rename: changes.NotifyDeleted() return - case evt.IsModify(): + case evt.Op&fsnotify.Write == fsnotify.Write: fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index ffb8267..d47bc5d 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -10,13 +10,13 @@ import ( "github.com/hpcloud/tail/util" - "gopkg.in/fsnotify.v0" + "gopkg.in/fsnotify.v1" ) type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher - chans map[string]chan *fsnotify.FileEvent + chans map[string]chan *fsnotify.Event done map[string]chan bool watch chan *watchInfo remove chan string @@ -25,7 +25,6 @@ type InotifyTracker struct { type watchInfo struct { fname string - flags uint32 } var ( @@ -37,7 +36,7 @@ var ( goRun = func() { shared = &InotifyTracker{ mux: sync.Mutex{}, - chans: make(map[string]chan *fsnotify.FileEvent), + chans: make(map[string]chan *fsnotify.Event), done: make(map[string]chan bool), watch: make(chan *watchInfo), remove: make(chan string), @@ -49,21 +48,13 @@ var ( logger = log.New(os.Stderr, "", log.LstdFlags) ) -// WatchFlags signals the run goroutine to begin watching the input filename using -// using all flags. +// Watch signals the run goroutine to begin watching the input filename func Watch(fname string) error { - return WatchFlags(fname, fsnotify.FSN_ALL) -} - -// WatchFlags signals the run goroutine to begin watching the input filename using -// using the input flags. -func WatchFlags(fname string, flags uint32) error { // start running the shared InotifyTracker if not already running once.Do(goRun) shared.watch <- &watchInfo{ fname: fname, - flags: flags, } return <-shared.error } @@ -87,7 +78,7 @@ func RemoveWatch(fname string) { // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. -func Events(fname string) chan *fsnotify.FileEvent { +func Events(fname string) chan *fsnotify.Event { shared.mux.Lock() defer shared.mux.Unlock() @@ -101,15 +92,15 @@ func Cleanup(fname string) { // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating // a new Watcher if the previous Watcher was closed. -func (shared *InotifyTracker) watchFlags(fname string, flags uint32) error { +func (shared *InotifyTracker) addWatch(fname string) error { shared.mux.Lock() defer shared.mux.Unlock() if shared.chans[fname] == nil { - shared.chans[fname] = make(chan *fsnotify.FileEvent) + shared.chans[fname] = make(chan *fsnotify.Event) shared.done[fname] = make(chan bool) } - return shared.watcher.WatchFlags(fname, flags) + return shared.watcher.Add(fname) } // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the @@ -119,7 +110,7 @@ func (shared *InotifyTracker) removeWatch(fname string) { defer shared.mux.Unlock() if ch := shared.chans[fname]; ch != nil { - shared.watcher.RemoveWatch(fname) + shared.watcher.Remove(fname) delete(shared.chans, fname) close(ch) @@ -127,7 +118,7 @@ func (shared *InotifyTracker) removeWatch(fname string) { } // sendEvent sends the input event to the appropriate Tail. -func (shared *InotifyTracker) sendEvent(event *fsnotify.FileEvent) { +func (shared *InotifyTracker) sendEvent(event *fsnotify.Event) { shared.mux.Lock() ch := shared.chans[event.Name] done := shared.done[event.Name] @@ -153,18 +144,18 @@ func (shared *InotifyTracker) run() { for { select { case winfo := <-shared.watch: - shared.error <- shared.watchFlags(winfo.fname, winfo.flags) + shared.error <- shared.addWatch(winfo.fname) case fname := <-shared.remove: shared.removeWatch(fname) - case event, open := <-shared.watcher.Event: + case event, open := <-shared.watcher.Events: if !open { return } - shared.sendEvent(event) + shared.sendEvent(&event) - case err, open := <-shared.watcher.Error: + case err, open := <-shared.watcher.Errors: if !open { return } else if err != nil { From abb1479f04abdbe46d25e9ca2d09a223c4f16a50 Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Thu, 10 Sep 2015 16:56:51 -0700 Subject: [PATCH 7/8] Remove pointer from fsnotify.Event --- watch/inotify.go | 2 +- watch/inotify_tracker.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/watch/inotify.go b/watch/inotify.go index 0676ac3..258e3ef 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -77,7 +77,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh for { prevSize := fw.Size - var evt *fsnotify.Event + var evt fsnotify.Event var ok bool select { diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index d47bc5d..07e7d7a 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -16,7 +16,7 @@ import ( type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher - chans map[string]chan *fsnotify.Event + chans map[string]chan fsnotify.Event done map[string]chan bool watch chan *watchInfo remove chan string @@ -36,7 +36,7 @@ var ( goRun = func() { shared = &InotifyTracker{ mux: sync.Mutex{}, - chans: make(map[string]chan *fsnotify.Event), + chans: make(map[string]chan fsnotify.Event), done: make(map[string]chan bool), watch: make(chan *watchInfo), remove: make(chan string), @@ -78,7 +78,7 @@ func RemoveWatch(fname string) { // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. -func Events(fname string) chan *fsnotify.Event { +func Events(fname string) chan fsnotify.Event { shared.mux.Lock() defer shared.mux.Unlock() @@ -97,7 +97,7 @@ func (shared *InotifyTracker) addWatch(fname string) error { defer shared.mux.Unlock() if shared.chans[fname] == nil { - shared.chans[fname] = make(chan *fsnotify.Event) + shared.chans[fname] = make(chan fsnotify.Event) shared.done[fname] = make(chan bool) } return shared.watcher.Add(fname) @@ -118,7 +118,7 @@ func (shared *InotifyTracker) removeWatch(fname string) { } // sendEvent sends the input event to the appropriate Tail. -func (shared *InotifyTracker) sendEvent(event *fsnotify.Event) { +func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { shared.mux.Lock() ch := shared.chans[event.Name] done := shared.done[event.Name] @@ -153,7 +153,7 @@ func (shared *InotifyTracker) run() { if !open { return } - shared.sendEvent(&event) + shared.sendEvent(event) case err, open := <-shared.watcher.Errors: if !open { From f69ef84e36f90c536a57dc040186994436a7deb8 Mon Sep 17 00:00:00 2001 From: Benoit Sigoure Date: Tue, 27 Oct 2015 15:45:55 -0700 Subject: [PATCH 8/8] Fix race in the detection of truncation. Before going into ChangeEvents(), the code was calling stat on the file to know where it was at, which is incorrect as stat could return the new file size post truncation. Instead we now ask the file descriptor about our current offset, so we can compare our offset to the file size to try to detect truncation. Truncation detection remains brittle, but this closes an annoying race we frequently run into. --- tail.go | 6 +++--- watch/inotify.go | 4 ++-- watch/polling.go | 9 +++++++-- watch/watch.go | 9 ++++----- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/tail.go b/tail.go index 753158d..5532ac1 100644 --- a/tail.go +++ b/tail.go @@ -290,11 +290,11 @@ func (tail *Tail) tailFileSync() { // reopened if ReOpen is true. Truncated files are always reopened. func (tail *Tail) waitForChanges() error { if tail.changes == nil { - st, err := tail.file.Stat() + pos, err := tail.file.Seek(0, os.SEEK_CUR) if err != nil { return err } - tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, st) + tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, pos) } select { @@ -340,7 +340,7 @@ func (tail *Tail) openReader() { } func (tail *Tail) seekEnd() error { - return tail.seekTo(SeekInfo{Offset: 0, Whence: 2}) + return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END}) } func (tail *Tail) seekTo(pos SeekInfo) error { diff --git a/watch/inotify.go b/watch/inotify.go index 258e3ef..d6635f4 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -58,7 +58,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { panic("unreachable") } -func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { +func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges { changes := NewFileChanges() err := Watch(fw.Filename) @@ -66,7 +66,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh go changes.NotifyDeleted() } - fw.Size = fi.Size() + fw.Size = pos go func() { defer RemoveWatch(fw.Filename) diff --git a/watch/polling.go b/watch/polling.go index e13e034..5479272 100644 --- a/watch/polling.go +++ b/watch/polling.go @@ -40,14 +40,19 @@ func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error { panic("unreachable") } -func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, origFi os.FileInfo) *FileChanges { +func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges { changes := NewFileChanges() var prevModTime time.Time // XXX: use tomb.Tomb to cleanly manage these goroutines. replace // the fatal (below) with tomb's Kill. - fw.Size = origFi.Size() + fw.Size = pos + origFi, err := os.Stat(fw.Filename) + if err != nil { + changes.NotifyDeleted() + return changes + } go func() { defer changes.Close() diff --git a/watch/watch.go b/watch/watch.go index a3c06e8..fbc08a5 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -2,10 +2,7 @@ package watch -import ( - "gopkg.in/tomb.v1" - "os" -) +import "gopkg.in/tomb.v1" // FileWatcher monitors file-level events. type FileWatcher interface { @@ -16,5 +13,7 @@ type FileWatcher interface { // deletion, renames or truncations. Returned FileChanges group of // channels will be closed, thus become unusable, after a deletion // or truncation event. - ChangeEvents(*tomb.Tomb, os.FileInfo) *FileChanges + // In order to properly report truncations, ChangeEvents requires + // the caller to pass their current offset in the file. + ChangeEvents(*tomb.Tomb, int64) *FileChanges }