From 2cddd48e0a3479555b5a83373aeea14af405c3fd Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Wed, 29 May 2013 11:35:27 -0700 Subject: [PATCH] clean up ChangeEvents' goroutines upon tail.Stop --- CHANGES.md | 3 ++- tail.go | 8 ++++---- watch/inotify.go | 14 ++++++++++---- watch/polling.go | 8 +++++--- watch/watch.go | 2 +- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f7579c7..a1654d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,9 +1,10 @@ # May, 2013 -* Recognize deletions/renames when using polling file watcher (PR #1) +* Detect file deletions/renames in polling file watcher (PR #1) * Detect file truncation * Fix potential race condition when reopening the file (issue 5) * Fix potential blocking of `tail.Stop` (issue 4) +* Fix uncleaned up ChangeEvents goroutines after calling tail.Stop # Feb, 2013 diff --git a/tail.go b/tail.go index a1cd6d0..9cb2716 100644 --- a/tail.go +++ b/tail.go @@ -179,17 +179,17 @@ func (tail *Tail) tailFileSync() { tail.Kill(err) return } - changes = tail.watcher.ChangeEvents(st) + changes = tail.watcher.ChangeEvents(tail.Tomb, st) } select { case _, ok := <-changes: if !ok { - changes = nil // XXX: use tomb to kill changes' goroutine. + changes = nil // File got deleted/renamed/truncated. if tail.ReOpen { - // TODO: no logging in a library? + // XXX: no logging in a library? log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) err := tail.reopen() if err != nil { @@ -202,7 +202,7 @@ func (tail *Tail) tailFileSync() { continue } else { - log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) + log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) tail.close() return } diff --git a/watch/inotify.go b/watch/inotify.go index ba80c57..cb732c0 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -3,7 +3,6 @@ package watch import ( - "fmt" "github.com/howeyc/fsnotify" "os" "path/filepath" @@ -51,14 +50,14 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error { return nil } case <-t.Dying(): - return fmt.Errorf("Tomb dying") + return tomb.ErrDying } } panic("unreachable") } // ChangeEvents returns a channel that gets updated when the file is ready to be read. -func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { +func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool { w, err := fsnotify.NewWatcher() if err != nil { panic(err) @@ -80,7 +79,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { for { prevSize := fw.Size - evt := <-w.Event + var evt *fsnotify.FileEvent + + select { + case evt = <-w.Event: + case <-t.Dying(): + return + } + switch { case evt.IsDelete(): fallthrough diff --git a/watch/polling.go b/watch/polling.go index 7c694f6..ca1ee62 100644 --- a/watch/polling.go +++ b/watch/polling.go @@ -7,7 +7,6 @@ import ( "os" "sync" "time" - "fmt" ) // PollingFileWatcher polls the file for changes. @@ -34,13 +33,13 @@ func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error { case <-time.After(POLL_DURATION): continue case <-t.Dying(): - return fmt.Errorf("Tomb dying") + return tomb.ErrDying } } panic("unreachable") } -func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { +func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan bool { ch := make(chan bool) stop := make(chan bool) var once sync.Once @@ -64,6 +63,9 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { select { case <-stop: return + case <-t.Dying(): + once.Do(stopAndClose) + continue default: } diff --git a/watch/watch.go b/watch/watch.go index 7caa266..13fca2f 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -15,6 +15,6 @@ type FileWatcher interface { // ChangeEvents returns a channel of events corresponding to the // times the file is ready to be read. - ChangeEvents(os.FileInfo) chan bool + ChangeEvents(tomb.Tomb, os.FileInfo) chan bool }