diff --git a/tail.go b/tail.go index 74040ac..68df120 100644 --- a/tail.go +++ b/tail.go @@ -40,7 +40,7 @@ type Tail struct { file *os.File reader *bufio.Reader watcher watch.FileWatcher - changes chan bool + changes *watch.FileChanges tomb.Tomb // provides: Done, Kill, Dying } @@ -163,7 +163,7 @@ func (tail *Tail) tailFileSync() { // When EOF is reached, wait for more data to become // available. Wait strategy is based on the `tail.watcher` // implementation (inotify or polling). - err = tail.waitForChanges() + err := tail.waitForChanges() if err != nil { if err != ErrStop { tail.Kill(err) @@ -196,26 +196,31 @@ func (tail *Tail) waitForChanges() error { } select { - case _, ok := <-tail.changes: - if !ok { - tail.changes = nil - - // File got deleted/renamed/truncated. - if tail.ReOpen { - // XXX: no logging in a library? - log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) - err := tail.reopen() - if err != nil { - return err - } - log.Printf("Successfully reopened %s", tail.Filename) - tail.reader = bufio.NewReader(tail.file) - return nil - } else { - log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) - return ErrStop + case <-tail.changes.Modified: + case <-tail.changes.Deleted: + tail.changes = nil + if tail.ReOpen { + // XXX: we must not log from a library. + log.Printf("Re-opening moved/deleted file %s ...", tail.Filename) + if err := tail.reopen(); err != nil { + return err } + log.Printf("Successfully reopened %s", tail.Filename) + tail.reader = bufio.NewReader(tail.file) + return nil + } else { + log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) + return ErrStop } + case <-tail.changes.Truncated: + // Always reopen truncated files (Follow is true) + log.Printf("Re-opening truncated file %s ...", tail.Filename) + if err := tail.reopen(); err != nil { + return err + } + log.Printf("Successfully reopened truncated %s", tail.Filename) + tail.reader = bufio.NewReader(tail.file) + return nil case <-tail.Dying(): return ErrStop } diff --git a/watch/filechanges.go b/watch/filechanges.go new file mode 100644 index 0000000..d28c189 --- /dev/null +++ b/watch/filechanges.go @@ -0,0 +1,42 @@ +package watch + +type FileChanges struct { + Modified chan bool // Channel to get notified of modifications + Truncated chan bool // Channel to get notified of truncations + Deleted chan bool // Channel to get notified of deletions/renames +} + +func NewFileChanges() *FileChanges { + return &FileChanges{ + make(chan bool), make(chan bool), make(chan bool)} +} + +func (fc *FileChanges) NotifyModified() { + sendOnlyIfEmpty(fc.Modified) +} + +func (fc *FileChanges) NotifyTruncated() { + sendOnlyIfEmpty(fc.Truncated) +} + +func (fc *FileChanges) NotifyDeleted() { + sendOnlyIfEmpty(fc.Deleted) +} + +func (fc *FileChanges) Close() { + close(fc.Modified) + close(fc.Truncated) + close(fc.Deleted) +} + +// sendOnlyIfEmpty sends on a bool channel only if the channel has no +// backlog to be read by other goroutines. This concurrency pattern +// can be used to notify other goroutines if and only if they are +// looking for it (i.e., subsequent notifications can be compressed +// into one). +func sendOnlyIfEmpty(ch chan bool) { + select { + case ch <- true: + default: + } +} \ No newline at end of file diff --git a/watch/inotify.go b/watch/inotify.go index 2d20b44..b4e6544 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -56,7 +56,9 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error { panic("unreachable") } -func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool { +func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) *FileChanges { + changes := NewFileChanges() + w, err := fsnotify.NewWatcher() if err != nil { panic(err) @@ -66,14 +68,12 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo panic(err) } - ch := make(chan bool) - fw.Size = fi.Size() go func() { defer w.Close() defer w.RemoveWatch(fw.Filename) - defer close(ch) + defer changes.Close() for { prevSize := fw.Size @@ -91,6 +91,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo fallthrough case evt.IsRename(): + changes.NotifyDeleted() return case evt.IsModify(): @@ -102,17 +103,13 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo fw.Size = fi.Size() if prevSize > 0 && prevSize > fw.Size { - return - } - - // send only if channel is empty. - select { - case ch <- true: - default: + changes.NotifyTruncated() + }else{ + changes.NotifyModified() } } } }() - return ch + return changes } diff --git a/watch/polling.go b/watch/polling.go index ca1ee62..773c82a 100644 --- a/watch/polling.go +++ b/watch/polling.go @@ -5,7 +5,6 @@ package watch import ( "launchpad.net/tomb" "os" - "sync" "time" ) @@ -39,33 +38,23 @@ func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error { panic("unreachable") } -func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan bool { - ch := make(chan bool) - stop := make(chan bool) - var once sync.Once +func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) *FileChanges { + changes := NewFileChanges() var prevModTime time.Time // XXX: use tomb.Tomb to cleanly manage these goroutines. replace // the panic (below) with tomb's Kill. - stopAndClose := func() { - go func() { - close(ch) - stop <- true - }() - } - fw.Size = origFi.Size() go func() { + defer changes.Close() + prevSize := fw.Size for { select { - case <-stop: - return case <-t.Dying(): - once.Do(stopAndClose) - continue + return default: } @@ -73,39 +62,37 @@ func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { - once.Do(stopAndClose) - continue + // File does not exist (has been deleted). + changes.NotifyDeleted() + return } /// XXX: do not panic here. panic(err) } - // File got moved/rename within POLL_DURATION? + // File got moved/renamed? if !os.SameFile(origFi, fi) { - once.Do(stopAndClose) - continue + changes.NotifyDeleted() + return } - // Was the file truncated? + // File got truncated? fw.Size = fi.Size() if prevSize > 0 && prevSize > fw.Size { - once.Do(stopAndClose) + changes.NotifyTruncated() continue } - // If the file was changed since last check, notify. + // File was appended to (changed)? modTime := fi.ModTime() if modTime != prevModTime { prevModTime = modTime - select { - case ch <- true: - default: - } + changes.NotifyModified() } } }() - return ch + return changes } func init() { diff --git a/watch/watch.go b/watch/watch.go index 0b4519d..7840b8c 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -16,6 +16,6 @@ type FileWatcher interface { // ChangeEvents returns a channel of events corresponding to the // times the file is ready to be read. The channel will be closed // if the file gets deleted, renamed or truncated. - ChangeEvents(tomb.Tomb, os.FileInfo) chan bool + ChangeEvents(tomb.Tomb, os.FileInfo) *FileChanges }