diff --git a/tail.go b/tail.go index a90f117..365164f 100644 --- a/tail.go +++ b/tail.go @@ -305,3 +305,9 @@ func (tail *Tail) sendLine(line []byte) bool { return true } + +// Cleanup removes open inotify watchers, because the Linux kernel does do so +// upon process exit. +func Cleanup() { + watch.Cleanup() +} diff --git a/tail_test.go b/tail_test.go index 9440f95..dea1838 100644 --- a/tail_test.go +++ b/tail_test.go @@ -38,6 +38,7 @@ func TestMustExist(t *testing.T) { t.Error("MustExist:true on an existing file is violated") } tail.Stop() + Cleanup() } func TestStop(t *testing.T) { @@ -48,6 +49,7 @@ func TestStop(t *testing.T) { if tail.Stop() != nil { t.Error("Should be stoped successfully") } + Cleanup() } func TestMaxLineSize(_t *testing.T) { @@ -61,6 +63,7 @@ func TestMaxLineSize(_t *testing.T) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") tail.Stop() + Cleanup() } func TestLocationFull(_t *testing.T) { @@ -74,6 +77,7 @@ func TestLocationFull(_t *testing.T) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") tail.Stop() + Cleanup() } func TestLocationFullDontFollow(_t *testing.T) { @@ -88,6 +92,7 @@ func TestLocationFullDontFollow(_t *testing.T) { <-time.After(100 * time.Millisecond) tail.Stop() + Cleanup() } func TestLocationEnd(_t *testing.T) { @@ -104,6 +109,7 @@ func TestLocationEnd(_t *testing.T) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") tail.Stop() + Cleanup() } func TestLocationMiddle(_t *testing.T) { @@ -121,6 +127,7 @@ func TestLocationMiddle(_t *testing.T) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") tail.Stop() + Cleanup() } func _TestReOpen(_t *testing.T, poll bool) { @@ -160,6 +167,7 @@ func _TestReOpen(_t *testing.T, poll bool) { // the reading of data written above. Timings can vary based on // test environment. // tail.Stop() + Cleanup() } // The use of polling file watcher could affect file rotation @@ -202,6 +210,7 @@ func _TestReSeek(_t *testing.T, poll bool) { // the reading of data written above. Timings can vary based on // test environment. // tail.Stop() + Cleanup() } // The use of polling file watcher could affect file rotation @@ -233,6 +242,7 @@ func TestRateLimiting(_t *testing.T) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") tail.Stop() + Cleanup() } func TestTell(_t *testing.T) { @@ -266,6 +276,7 @@ func TestTell(_t *testing.T) { } t.RemoveFile("test.txt") tail.Done() + Cleanup() } // Test library diff --git a/watch/inotify.go b/watch/inotify.go index 8acb07e..918b788 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -7,9 +7,12 @@ import ( "github.com/howeyc/fsnotify" "launchpad.net/tomb" "os" +"fmt" "path/filepath" ) +var inotifyTracker *InotifyTracker + // InotifyFileWatcher uses inotify to monitor file changes. type InotifyFileWatcher struct { Filename string @@ -22,11 +25,11 @@ func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { } func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { - w, err := fsnotify.NewWatcher() + w, err := inotifyTracker.NewWatcher() if err != nil { return err } - defer w.Close() + defer inotifyTracker.CloseWatcher(w) dirname := filepath.Dir(fw.Filename) @@ -35,7 +38,6 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { if err != nil { return err } - defer w.RemoveWatch(filepath.Dir(fw.Filename)) // Do a real check now as the file might have been created before // calling `WatchFlags` above. @@ -46,8 +48,10 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { for { select { - case evt := <-w.Event: - if evt.Name == fw.Filename { + case evt, ok := <-w.Event: + if !ok { + return fmt.Errorf("inotify watcher has been closed") + }else if evt.Name == fw.Filename { return nil } case <-t.Dying(): @@ -60,7 +64,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { changes := NewFileChanges() - w, err := fsnotify.NewWatcher() + w, err := inotifyTracker.NewWatcher() if err != nil { util.Fatal("Error creating fsnotify watcher: %v", err) } @@ -72,17 +76,20 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh fw.Size = fi.Size() go func() { - defer w.Close() - defer w.RemoveWatch(fw.Filename) + defer inotifyTracker.CloseWatcher(w) defer changes.Close() for { prevSize := fw.Size var evt *fsnotify.FileEvent + var ok bool select { - case evt = <-w.Event: + case evt, ok = <-w.Event: + if !ok { + return + } case <-t.Dying(): return } @@ -119,3 +126,11 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh return changes } + +func Cleanup() { + inotifyTracker.CloseAll() +} + +func init() { + inotifyTracker = NewInotifyTracker() +} diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go new file mode 100644 index 0000000..29d6d43 --- /dev/null +++ b/watch/inotify_tracker.go @@ -0,0 +1,49 @@ +package watch + +import ( + "github.com/howeyc/fsnotify" + "log" + "sync" +) + +type InotifyTracker struct { + mux sync.Mutex + watchers map[*fsnotify.Watcher]bool +} + +func NewInotifyTracker() *InotifyTracker { + t := new(InotifyTracker) + t.watchers = make(map[*fsnotify.Watcher]bool) + return t +} + +func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) { + t.mux.Lock() + defer t.mux.Unlock() + w, err := fsnotify.NewWatcher() + if err == nil { + t.watchers[w] = true + } + return w, err +} + +func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) { + t.mux.Lock() + defer t.mux.Unlock() + if _, ok := t.watchers[w]; ok { + err = w.Close() + delete(t.watchers, w) + } + return +} + +func (t *InotifyTracker) CloseAll() { + t.mux.Lock() + defer t.mux.Unlock() + for w, _ := range t.watchers { + if err := w.Close(); err != nil { + log.Printf("Error closing watcher: %v", err) + } + delete(t.watchers, w) + } +}