Merge pull request #20 from ActiveState/inotify_cleanup

add Cleanup function to close open inotify watches
This commit is contained in:
Sridhar Ratnakumar 2013-11-13 17:43:11 -08:00
commit f3593827b3
8 changed files with 109 additions and 14 deletions

View File

@ -1,3 +1,16 @@
# Nov, 2013
* add Cleanup to remove leaky inotify watches (PR #20)
# Aug, 2013
* redesigned Location field (PR #12)
* add tail.Tell (PR #14)
# July, 2013
* Rate limiting (PR #10)
# May, 2013 # May, 2013
* Detect file deletions/renames in polling file watcher (PR #1) * Detect file deletions/renames in polling file watcher (PR #1)

View File

@ -4,7 +4,7 @@ test: *.go
go test -v go test -v
fmt: fmt:
go fmt . gofmt -w .
# Run the test in an isolated environment. # Run the test in an isolated environment.
fulltest: fulltest:

View File

@ -3,9 +3,9 @@
package main package main
import ( import (
"flag"
"fmt" "fmt"
"github.com/ActiveState/tail" "github.com/ActiveState/tail"
"flag"
"os" "os"
) )

View File

@ -305,3 +305,10 @@ func (tail *Tail) sendLine(line []byte) bool {
return true return true
} }
// Cleanup removes inotify watches added by the tail package. This function is
// meant to be invoked from a process's exit handler. Linux kernel will not
// automatically remove inotify watches after the process exits.
func Cleanup() {
watch.Cleanup()
}

View File

@ -38,6 +38,7 @@ func TestMustExist(t *testing.T) {
t.Error("MustExist:true on an existing file is violated") t.Error("MustExist:true on an existing file is violated")
} }
tail.Stop() tail.Stop()
Cleanup()
} }
func TestStop(t *testing.T) { func TestStop(t *testing.T) {
@ -48,6 +49,7 @@ func TestStop(t *testing.T) {
if tail.Stop() != nil { if tail.Stop() != nil {
t.Error("Should be stoped successfully") t.Error("Should be stoped successfully")
} }
Cleanup()
} }
func TestMaxLineSize(_t *testing.T) { func TestMaxLineSize(_t *testing.T) {
@ -61,6 +63,7 @@ func TestMaxLineSize(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() tail.Stop()
Cleanup()
} }
func TestLocationFull(_t *testing.T) { func TestLocationFull(_t *testing.T) {
@ -74,6 +77,7 @@ func TestLocationFull(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() tail.Stop()
Cleanup()
} }
func TestLocationFullDontFollow(_t *testing.T) { func TestLocationFullDontFollow(_t *testing.T) {
@ -88,6 +92,7 @@ func TestLocationFullDontFollow(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
tail.Stop() tail.Stop()
Cleanup()
} }
func TestLocationEnd(_t *testing.T) { func TestLocationEnd(_t *testing.T) {
@ -104,6 +109,7 @@ func TestLocationEnd(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() tail.Stop()
Cleanup()
} }
func TestLocationMiddle(_t *testing.T) { func TestLocationMiddle(_t *testing.T) {
@ -121,6 +127,7 @@ func TestLocationMiddle(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() tail.Stop()
Cleanup()
} }
func _TestReOpen(_t *testing.T, poll bool) { func _TestReOpen(_t *testing.T, poll bool) {
@ -160,6 +167,7 @@ func _TestReOpen(_t *testing.T, poll bool) {
// the reading of data written above. Timings can vary based on // the reading of data written above. Timings can vary based on
// test environment. // test environment.
// tail.Stop() // tail.Stop()
Cleanup()
} }
// The use of polling file watcher could affect file rotation // The use of polling file watcher could affect file rotation
@ -202,6 +210,7 @@ func _TestReSeek(_t *testing.T, poll bool) {
// the reading of data written above. Timings can vary based on // the reading of data written above. Timings can vary based on
// test environment. // test environment.
// tail.Stop() // tail.Stop()
Cleanup()
} }
// The use of polling file watcher could affect file rotation // The use of polling file watcher could affect file rotation
@ -233,6 +242,7 @@ func TestRateLimiting(_t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() tail.Stop()
Cleanup()
} }
func TestTell(_t *testing.T) { func TestTell(_t *testing.T) {
@ -266,6 +276,7 @@ func TestTell(_t *testing.T) {
} }
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Done() tail.Done()
Cleanup()
} }
// Test library // Test library

View File

@ -1,9 +1,9 @@
package watch package watch
type FileChanges struct { type FileChanges struct {
Modified chan bool // Channel to get notified of modifications Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames Deleted chan bool // Channel to get notified of deletions/renames
} }
func NewFileChanges() *FileChanges { func NewFileChanges() *FileChanges {

View File

@ -3,6 +3,7 @@
package watch package watch
import ( import (
"fmt"
"github.com/ActiveState/tail/util" "github.com/ActiveState/tail/util"
"github.com/howeyc/fsnotify" "github.com/howeyc/fsnotify"
"launchpad.net/tomb" "launchpad.net/tomb"
@ -10,6 +11,8 @@ import (
"path/filepath" "path/filepath"
) )
var inotifyTracker *InotifyTracker
// InotifyFileWatcher uses inotify to monitor file changes. // InotifyFileWatcher uses inotify to monitor file changes.
type InotifyFileWatcher struct { type InotifyFileWatcher struct {
Filename string Filename string
@ -22,11 +25,11 @@ func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
} }
func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
w, err := fsnotify.NewWatcher() w, err := inotifyTracker.NewWatcher()
if err != nil { if err != nil {
return err return err
} }
defer w.Close() defer inotifyTracker.CloseWatcher(w)
dirname := filepath.Dir(fw.Filename) dirname := filepath.Dir(fw.Filename)
@ -35,7 +38,6 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
if err != nil { if err != nil {
return err return err
} }
defer w.RemoveWatch(filepath.Dir(fw.Filename))
// Do a real check now as the file might have been created before // Do a real check now as the file might have been created before
// calling `WatchFlags` above. // calling `WatchFlags` above.
@ -46,8 +48,10 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
for { for {
select { select {
case evt := <-w.Event: case evt, ok := <-w.Event:
if evt.Name == fw.Filename { if !ok {
return fmt.Errorf("inotify watcher has been closed")
} else if evt.Name == fw.Filename {
return nil return nil
} }
case <-t.Dying(): case <-t.Dying():
@ -60,7 +64,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges {
changes := NewFileChanges() changes := NewFileChanges()
w, err := fsnotify.NewWatcher() w, err := inotifyTracker.NewWatcher()
if err != nil { if err != nil {
util.Fatal("Error creating fsnotify watcher: %v", err) util.Fatal("Error creating fsnotify watcher: %v", err)
} }
@ -72,17 +76,20 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh
fw.Size = fi.Size() fw.Size = fi.Size()
go func() { go func() {
defer w.Close() defer inotifyTracker.CloseWatcher(w)
defer w.RemoveWatch(fw.Filename)
defer changes.Close() defer changes.Close()
for { for {
prevSize := fw.Size prevSize := fw.Size
var evt *fsnotify.FileEvent var evt *fsnotify.FileEvent
var ok bool
select { select {
case evt = <-w.Event: case evt, ok = <-w.Event:
if !ok {
return
}
case <-t.Dying(): case <-t.Dying():
return return
} }
@ -119,3 +126,11 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh
return changes return changes
} }
func Cleanup() {
inotifyTracker.CloseAll()
}
func init() {
inotifyTracker = NewInotifyTracker()
}

49
watch/inotify_tracker.go Normal file
View File

@ -0,0 +1,49 @@
package watch
import (
"github.com/howeyc/fsnotify"
"log"
"sync"
)
type InotifyTracker struct {
mux sync.Mutex
watchers map[*fsnotify.Watcher]bool
}
func NewInotifyTracker() *InotifyTracker {
t := new(InotifyTracker)
t.watchers = make(map[*fsnotify.Watcher]bool)
return t
}
func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) {
t.mux.Lock()
defer t.mux.Unlock()
w, err := fsnotify.NewWatcher()
if err == nil {
t.watchers[w] = true
}
return w, err
}
func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) {
t.mux.Lock()
defer t.mux.Unlock()
if _, ok := t.watchers[w]; ok {
err = w.Close()
delete(t.watchers, w)
}
return
}
func (t *InotifyTracker) CloseAll() {
t.mux.Lock()
defer t.mux.Unlock()
for w, _ := range t.watchers {
if err := w.Close(); err != nil {
log.Printf("Error closing watcher: %v", err)
}
delete(t.watchers, w)
}
}