From b155fc13d44fe6ee44a589a3ed4a191e00afa574 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Thu, 30 Jul 2015 12:06:40 -0700 Subject: [PATCH] Single shared Watcher used to avoid inotify limit --- tail.go | 16 ++--- watch/inotify.go | 24 ++++--- watch/inotify_tracker.go | 136 ++++++++++++++++++++++++++++++--------- 3 files changed, 122 insertions(+), 54 deletions(-) diff --git a/tail.go b/tail.go index a7a1a5b..753158d 100644 --- a/tail.go +++ b/tail.go @@ -62,9 +62,8 @@ type Tail struct { Lines chan *Line Config - file *os.File - reader *bufio.Reader - tracker *watch.InotifyTracker + file *os.File + reader *bufio.Reader watcher watch.FileWatcher changes *watch.FileChanges @@ -102,12 +101,7 @@ func TailFile(filename string, config Config) (*Tail, error) { if t.Poll { t.watcher = watch.NewPollingFileWatcher(filename) } else { - t.tracker = watch.NewInotifyTracker() - w, err := t.tracker.NewWatcher() - if err != nil { - return nil, err - } - t.watcher = watch.NewInotifyFileWatcher(filename, w) + t.watcher = watch.NewInotifyFileWatcher(filename) } if t.MustExist { @@ -390,7 +384,5 @@ func (tail *Tail) sendLine(line string) bool { // meant to be invoked from a process's exit handler. Linux kernel may not // automatically remove inotify watches after the process exits. func (tail *Tail) Cleanup() { - if tail.tracker != nil { - tail.tracker.CloseAll() - } + watch.Cleanup(tail.Filename) } diff --git a/watch/inotify.go b/watch/inotify.go index ee660af..0c95f7f 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -8,6 +8,7 @@ import ( "path/filepath" "github.com/hpcloud/tail/util" + "gopkg.in/fsnotify.v0" "gopkg.in/tomb.v1" ) @@ -16,11 +17,10 @@ import ( type InotifyFileWatcher struct { Filename string Size int64 - w *fsnotify.Watcher } -func NewInotifyFileWatcher(filename string, w *fsnotify.Watcher) *InotifyFileWatcher { - fw := &InotifyFileWatcher{filename, 0, w} +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filename, 0} return fw } @@ -28,11 +28,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { dirname := filepath.Dir(fw.Filename) // Watch for new files to be created in the parent directory. - err := fw.w.WatchFlags(dirname, fsnotify.FSN_CREATE) + err := shared.WatchFlags(dirname, fsnotify.FSN_CREATE) if err != nil { return err } - defer fw.w.RemoveWatch(dirname) + defer shared.RemoveWatch(dirname) // Do a real check now as the file might have been created before // calling `WatchFlags` above. @@ -41,9 +41,11 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { return err } + events := shared.Events(fw.Filename) + for { select { - case evt, ok := <-fw.w.Event: + case evt, ok := <-events: if !ok { return fmt.Errorf("inotify watcher has been closed") } else if evt.Name == fw.Filename { @@ -59,17 +61,19 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error { func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileChanges { changes := NewFileChanges() - err := fw.w.Watch(fw.Filename) + err := shared.Watch(fw.Filename) if err != nil { - util.Fatal("Error watching %v: %v", fw.Filename, err) + go changes.NotifyDeleted() } fw.Size = fi.Size() go func() { - defer fw.w.RemoveWatch(fw.Filename) + defer shared.RemoveWatch(fw.Filename) defer changes.Close() + events := shared.Events(fw.Filename) + for { prevSize := fw.Size @@ -77,7 +81,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, fi os.FileInfo) *FileCh var ok bool select { - case evt, ok = <-fw.w.Event: + case evt, ok = <-events: if !ok { return } diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index 26ec800..b00b4fc 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -3,49 +3,121 @@ package watch import ( - "gopkg.in/fsnotify.v0" "log" + "os" "sync" + + "github.com/hpcloud/tail/util" + + "gopkg.in/fsnotify.v0" ) type InotifyTracker struct { - mux sync.Mutex - watchers map[*fsnotify.Watcher]bool + mux sync.Mutex + watcher *fsnotify.Watcher + chans map[string]chan *fsnotify.FileEvent + done chan struct{} } -func NewInotifyTracker() *InotifyTracker { - t := new(InotifyTracker) - t.watchers = make(map[*fsnotify.Watcher]bool) - return t -} - -func (t *InotifyTracker) NewWatcher() (*fsnotify.Watcher, error) { - t.mux.Lock() - defer t.mux.Unlock() - w, err := fsnotify.NewWatcher() - if err == nil { - t.watchers[w] = true +var ( + shared = &InotifyTracker{ + mux: sync.Mutex{}, + chans: make(map[string]chan *fsnotify.FileEvent), } - return w, err + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Watch calls fsnotify.Watch for the input filename, creating a new Watcher if the +// previous Watcher was closed. +func (shared *InotifyTracker) Watch(filename string) error { + return shared.WatchFlags(filename, fsnotify.FSN_ALL) } -func (t *InotifyTracker) CloseWatcher(w *fsnotify.Watcher) (err error) { - t.mux.Lock() - defer t.mux.Unlock() - if _, ok := t.watchers[w]; ok { - err = w.Close() - delete(t.watchers, w) - } - return -} +// WatchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) WatchFlags(filename string, flags uint32) error { + shared.mux.Lock() + defer shared.mux.Unlock() -func (t *InotifyTracker) CloseAll() { - t.mux.Lock() - defer t.mux.Unlock() - for w, _ := range t.watchers { - if err := w.Close(); err != nil { - log.Printf("Error closing watcher: %v", err) + // Start up shared struct if necessary + if len(shared.chans) == 0 { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("Error creating Watcher") + } + shared.watcher = watcher + shared.done = make(chan struct{}) + go shared.run() + } + + // Create a channel to which FileEvents for the input filename will be sent + ch := shared.chans[filename] + if ch == nil { + shared.chans[filename] = make(chan *fsnotify.FileEvent) + } + return shared.watcher.WatchFlags(filename, flags) +} + +// RemoveWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) RemoveWatch(filename string) { + shared.mux.Lock() + defer shared.mux.Unlock() + + _, found := shared.chans[filename] + if !found { + return + } + + shared.watcher.RemoveWatch(filename) + delete(shared.chans, filename) + + // If this is the last target to be removed, close the shared Watcher + if len(shared.chans) == 0 { + shared.watcher.Close() + close(shared.done) + } +} + +// Events returns a channel to which FileEvents corresponding to the input filename +// will be sent. This channel will be closed when removeWatch is called on this +// filename. +func (shared *InotifyTracker) Events(filename string) <-chan *fsnotify.FileEvent { + shared.mux.Lock() + defer shared.mux.Unlock() + + return shared.chans[filename] +} + +// Cleanup removes the watch for the input filename and closes the shared Watcher +// if there are no more targets. +func Cleanup(filename string) { + shared.RemoveWatch(filename) +} + +// run starts the goroutine in which the shared struct reads events from its +// Watcher's Event channel and sends the events to the appropriate Tail. +func (shared *InotifyTracker) run() { + for { + select { + case event, open := <-shared.watcher.Event: + if !open { + return + } + // send the FileEvent to the appropriate Tail's channel + ch := shared.chans[event.Name] + if ch != nil { + ch <- event + } + + case err, open := <-shared.watcher.Error: + if !open { + return + } + logger.Printf("Error in Watcher Errors channel: %s", err) + + case <-shared.done: + return } - delete(t.watchers, w) } }