// Copyright (c) 2015 HPE Software Inc. All rights reserved. // Copyright (c) 2013 ActiveState Software Inc. All rights reserved. package watch import ( "log" "os" "path/filepath" "sync" "syscall" "git.lovezsh.com/go-kit/tail/util" "github.com/fsnotify/fsnotify" ) type InotifyTracker struct { mux sync.Mutex watcher *fsnotify.Watcher chans map[string]chan fsnotify.Event done map[string]chan bool watchNums map[string]int watch chan *watchInfo remove chan *watchInfo error chan error } type watchInfo struct { op fsnotify.Op fname string } func (this *watchInfo) isCreate() bool { return this.op == fsnotify.Create } var ( // globally shared InotifyTracker; ensures only one fsnotify.Watcher is used shared *InotifyTracker // these are used to ensure the shared InotifyTracker is run exactly once once = sync.Once{} goRun = func() { shared = &InotifyTracker{ mux: sync.Mutex{}, chans: make(map[string]chan fsnotify.Event), done: make(map[string]chan bool), watchNums: make(map[string]int), watch: make(chan *watchInfo), remove: make(chan *watchInfo), error: make(chan error), } go shared.run() } logger = log.New(os.Stderr, "", log.LstdFlags) ) // Watch signals the run goroutine to begin watching the input filename func Watch(fname string) error { return watch(&watchInfo{ fname: fname, }) } // Watch create signals the run goroutine to begin watching the input filename // if call the WatchCreate function, don't call the Cleanup, call the RemoveWatchCreate func WatchCreate(fname string) error { return watch(&watchInfo{ op: fsnotify.Create, fname: fname, }) } func watch(winfo *watchInfo) error { // start running the shared InotifyTracker if not already running once.Do(goRun) winfo.fname = filepath.Clean(winfo.fname) shared.watch <- winfo return <-shared.error } // RemoveWatch signals the run goroutine to remove the watch for the input filename func RemoveWatch(fname string) error { return remove(&watchInfo{ fname: fname, }) } // RemoveWatch create signals the run goroutine to remove the watch for the input filename func RemoveWatchCreate(fname string) error { return remove(&watchInfo{ op: fsnotify.Create, fname: fname, }) } func remove(winfo *watchInfo) error { // start running the shared InotifyTracker if not already running once.Do(goRun) winfo.fname = filepath.Clean(winfo.fname) shared.mux.Lock() done := shared.done[winfo.fname] if done != nil { delete(shared.done, winfo.fname) close(done) } shared.mux.Unlock() shared.remove <- winfo return <-shared.error } // Events returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatch is called on this // filename. func Events(fname string) <-chan fsnotify.Event { shared.mux.Lock() defer shared.mux.Unlock() return shared.chans[fname] } // Cleanup removes the watch for the input filename if necessary. func Cleanup(fname string) error { return RemoveWatch(fname) } // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating // a new Watcher if the previous Watcher was closed. func (shared *InotifyTracker) addWatch(winfo *watchInfo) error { shared.mux.Lock() defer shared.mux.Unlock() if shared.chans[winfo.fname] == nil { shared.chans[winfo.fname] = make(chan fsnotify.Event) } if shared.done[winfo.fname] == nil { shared.done[winfo.fname] = make(chan bool) } fname := winfo.fname if winfo.isCreate() { // Watch for new files to be created in the parent directory. fname = filepath.Dir(fname) } var err error // already in inotify watch if shared.watchNums[fname] == 0 { err = shared.watcher.Add(fname) } if err == nil { shared.watchNums[fname]++ } return err } // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the // corresponding events channel. func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error { shared.mux.Lock() ch := shared.chans[winfo.fname] if ch != nil { delete(shared.chans, winfo.fname) close(ch) } fname := winfo.fname if winfo.isCreate() { // Watch for new files to be created in the parent directory. fname = filepath.Dir(fname) } shared.watchNums[fname]-- watchNum := shared.watchNums[fname] if watchNum == 0 { delete(shared.watchNums, fname) } shared.mux.Unlock() var err error // If we were the last ones to watch this file, unsubscribe from inotify. // This needs to happen after releasing the lock because fsnotify waits // synchronously for the kernel to acknowledge the removal of the watch // for this file, which causes us to deadlock if we still held the lock. if watchNum == 0 { err = shared.watcher.Remove(fname) } return err } // sendEvent sends the input event to the appropriate Tail. func (shared *InotifyTracker) sendEvent(event fsnotify.Event) { name := filepath.Clean(event.Name) shared.mux.Lock() ch := shared.chans[name] done := shared.done[name] shared.mux.Unlock() if ch != nil && done != nil { select { case ch <- event: case <-done: } } } // run starts the goroutine in which the shared struct reads events from its // Watcher's Event channel and sends the events to the appropriate Tail. func (shared *InotifyTracker) run() { watcher, err := fsnotify.NewWatcher() if err != nil { util.Fatal("failed to create Watcher") } shared.watcher = watcher for { select { case winfo := <-shared.watch: shared.error <- shared.addWatch(winfo) case winfo := <-shared.remove: shared.error <- shared.removeWatch(winfo) case event, open := <-shared.watcher.Events: if !open { return } shared.sendEvent(event) case err, open := <-shared.watcher.Errors: if !open { return } else if err != nil { sysErr, ok := err.(*os.SyscallError) if !ok || sysErr.Err != syscall.EINTR { logger.Printf("Error in Watcher Error channel: %s", err) } } } } }