From f053e2cd0c63c413f71a0d179afba1c6e4baabe5 Mon Sep 17 00:00:00 2001 From: Andy Ouyang Date: Tue, 4 Aug 2015 17:42:45 -0700 Subject: [PATCH] Fixed race conditions in watch/inotify_tracker.go --- watch/inotify_tracker.go | 151 ++++++++++++++++++++++++--------------- 1 file changed, 93 insertions(+), 58 deletions(-) diff --git a/watch/inotify_tracker.go b/watch/inotify_tracker.go index b937621..7f9d5f8 100644 --- a/watch/inotify_tracker.go +++ b/watch/inotify_tracker.go @@ -17,99 +17,137 @@ type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher chans map[string]chan *fsnotify.FileEvent - done chan struct{} + watch chan *watchInfo + remove chan string + error chan error +} + +type watchInfo struct { + fname string + flags uint32 } var ( + // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used shared = &InotifyTracker{ - mux: sync.Mutex{}, - chans: make(map[string]chan *fsnotify.FileEvent), + mux: sync.Mutex{}, + chans: make(map[string]chan *fsnotify.FileEvent), + watch: make(chan *watchInfo), + remove: make(chan string), + error: make(chan error), } - logger = log.New(os.Stderr, "", log.LstdFlags) -) -// Watch calls fsnotify.Watch for the input filename, creating a new Watcher if the -// previous Watcher was closed. -func (shared *InotifyTracker) Watch(filename string) error { - return shared.WatchFlags(filename, fsnotify.FSN_ALL) -} - -// WatchFlags calls fsnotify.WatchFlags for the input filename and flags, creating -// a new Watcher if the previous Watcher was closed. -func (shared *InotifyTracker) WatchFlags(filename string, flags uint32) error { - shared.mux.Lock() - defer shared.mux.Unlock() - - // Start up shared struct if necessary - if len(shared.chans) == 0 { - watcher, err := fsnotify.NewWatcher() - if err != nil { - util.Fatal("Error creating Watcher") - } - shared.watcher = watcher - shared.done = make(chan struct{}) + // these are used to ensure the shared InotifyTracker is run exactly once + once = &sync.Once{} + goRun = func() { go shared.run() } - // Create a channel to which FileEvents for the input filename will be sent - ch := shared.chans[filename] - if ch == nil { - shared.chans[filename] = make(chan *fsnotify.FileEvent) - } - return shared.watcher.WatchFlags(filename, flags) + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// WatchFlags signals the run goroutine to begin watching the input filename using +// using all flags. +func (shared *InotifyTracker) Watch(fname string) error { + return shared.WatchFlags(fname, fsnotify.FSN_ALL) } -// RemoveWatch calls fsnotify.RemoveWatch for the input filename and closes the -// corresponding events channel. -func (shared *InotifyTracker) RemoveWatch(filename string) { - shared.mux.Lock() - defer shared.mux.Unlock() +// WatchFlags signals the run goroutine to begin watching the input filename using +// using the input flags. +func (shared *InotifyTracker) WatchFlags(fname string, flags uint32) error { + // start running the shared InotifyTracker if not already running + once.Do(goRun) - _, found := shared.chans[filename] - if !found { - return + shared.watch <- &watchInfo{ + fname: fname, + flags: flags, } + return <-shared.error +} - shared.watcher.RemoveWatch(filename) - delete(shared.chans, filename) +// RemoveWatch signals the run goroutine to remove the watch for the input filename +func (shared *InotifyTracker) RemoveWatch(fname string) { + // start running the shared InotifyTracker if not already running + once.Do(goRun) - // If this is the last target to be removed, close the shared Watcher - if len(shared.chans) == 0 { - shared.watcher.Close() - close(shared.done) - } + shared.remove <- fname } // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. -func (shared *InotifyTracker) Events(filename string) <-chan *fsnotify.FileEvent { +func (shared *InotifyTracker) Events(fname string) chan *fsnotify.FileEvent { shared.mux.Lock() defer shared.mux.Unlock() - return shared.chans[filename] + return shared.chans[fname] } // Cleanup removes the watch for the input filename and closes the shared Watcher // if there are no more targets. -func Cleanup(filename string) { - shared.RemoveWatch(filename) +func Cleanup(fname string) { + shared.RemoveWatch(fname) +} + +// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating +// a new Watcher if the previous Watcher was closed. +func (shared *InotifyTracker) watchFlags(fname string, flags uint32) error { + shared.mux.Lock() + defer shared.mux.Unlock() + + if shared.chans[fname] == nil { + shared.chans[fname] = make(chan *fsnotify.FileEvent) + } + return shared.watcher.WatchFlags(fname, flags) +} + +// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the +// corresponding events channel. +func (shared *InotifyTracker) removeWatch(fname string) { + shared.mux.Lock() + defer shared.mux.Unlock() + + if ch := shared.chans[fname]; ch != nil { + shared.watcher.RemoveWatch(fname) + + delete(shared.chans, fname) + close(ch) + } +} + +// sendEvent sends the input event to the appropriate Tail. +func (shared *InotifyTracker) sendEvent(event *fsnotify.FileEvent) { + ch := shared.Events(event.Name) + if ch != nil { + select { + case ch <- event: + default: + } + } } // run starts the goroutine in which the shared struct reads events from its // Watcher's Event channel and sends the events to the appropriate Tail. func (shared *InotifyTracker) run() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + util.Fatal("failed to create Watcher") + } + shared.watcher = watcher + for { select { + case winfo := <-shared.watch: + shared.error <- shared.watchFlags(winfo.fname, winfo.flags) + + case fname := <-shared.remove: + shared.removeWatch(fname) + case event, open := <-shared.watcher.Event: if !open { return } - // send the FileEvent to the appropriate Tail's channel - ch := shared.chans[event.Name] - if ch != nil { - ch <- event - } + shared.sendEvent(event) case err, open := <-shared.watcher.Error: if !open { @@ -120,9 +158,6 @@ func (shared *InotifyTracker) run() { logger.Printf("Error in Watcher Error channel: %s", err) } } - - case <-shared.done: - return } } }