From c5073c7f2639523e80bcdced11ca647a3eade0ec Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Mon, 27 May 2013 15:21:02 -0700 Subject: [PATCH] PollingFileWatcher.ChangeEvents must detect file deletion/rename --- tail.go | 12 ++++++++++-- tail_test.go | 26 +++++++++++++++++++++++--- watch.go | 44 +++++++++++++++++++++++++++++++++++--------- 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/tail.go b/tail.go index 07f7aed..c68d7b2 100644 --- a/tail.go +++ b/tail.go @@ -173,12 +173,20 @@ func (tail *Tail) tailFileSync() { // `tail.watcher` implementation (inotify or polling). if err == io.EOF { if changes == nil { - changes = tail.watcher.ChangeEvents() + st, err := tail.file.Stat() + if err != nil { + tail.close() + tail.Kill(err) + return + } + changes = tail.watcher.ChangeEvents(st) } select { case _, ok := <-changes: if !ok { + changes = nil // XXX: how to kill changes' goroutine? + // File got deleted/renamed if tail.ReOpen { // TODO: no logging in a library? @@ -191,7 +199,7 @@ func (tail *Tail) tailFileSync() { } log.Printf("Successfully reopened %s", tail.Filename) tail.reader = bufio.NewReader(tail.file) - changes = nil // XXX: how to kill changes' goroutine? + continue } else { log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) diff --git a/tail_test.go b/tail_test.go index 8b7adca..9f1d4e4 100644 --- a/tail_test.go +++ b/tail_test.go @@ -81,11 +81,18 @@ func TestLocationEnd(_t *testing.T) { } func _TestReOpen(_t *testing.T, poll bool) { - t := NewTailTest("reopen", _t) + var name string + if poll { + name = "reopen-polling" + }else { + name = "reopen-inotify" + } + t := NewTailTest(name, _t) t.CreateFile("test.txt", "hello\nworld\n") tail := t.StartTail( "test.txt", Config{Follow: true, ReOpen: true, Poll: poll, Location: -1}) + go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"}) // deletion must trigger reopen @@ -93,18 +100,26 @@ func _TestReOpen(_t *testing.T, poll bool) { t.RemoveFile("test.txt") <-time.After(100 * time.Millisecond) t.CreateFile("test.txt", "more\ndata\n") + if poll { + <-time.After(POLL_DURATION) + } // rename must trigger reopen <-time.After(100 * time.Millisecond) + println("going to rename") t.RenameFile("test.txt", "test.txt.rotated") <-time.After(100 * time.Millisecond) t.CreateFile("test.txt", "endofworld") + if poll { + <-time.After(POLL_DURATION) + } // Delete after a reasonable delay, to give tail sufficient time // to read all lines. <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") - + + println("Stopping tail") tail.Stop() } @@ -183,7 +198,12 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) { for idx, line := range lines { tailedLine, ok := <-tail.Lines if !ok { - t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) + err := tail.Wait() + if err != nil { + t.Fatal("tail ended early with error: %v", err) + }else{ + t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) + } } if tailedLine == nil { t.Fatalf("tail.Lines returned nil; not possible") diff --git a/watch.go b/watch.go index fba7441..70d7253 100644 --- a/watch.go +++ b/watch.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "time" + "sync" ) // FileWatcher monitors file-level events. @@ -17,7 +18,7 @@ type FileWatcher interface { // ChangeEvents returns a channel of events corresponding to the // times the file is ready to be read. - ChangeEvents() chan bool + ChangeEvents(os.FileInfo) chan bool } // InotifyFileWatcher uses inotify to monitor file changes. @@ -50,7 +51,8 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error { return nil } -func (fw *InotifyFileWatcher) ChangeEvents() chan bool { +// ChangeEvents returns a channel that gets updated when the file is ready to be read. +func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool { w, err := fsnotify.NewWatcher() if err != nil { panic(err) @@ -98,17 +100,31 @@ func NewPollingFileWatcher(filename string) *PollingFileWatcher { return fw } +var POLL_DURATION time.Duration + +// BlockUntilExists blocks until the file comes into existence. If the +// file already exists, then block until it is created again. func (fw *PollingFileWatcher) BlockUntilExists() error { panic("not implemented") return nil } -func (fw *PollingFileWatcher) ChangeEvents() chan bool { +func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { ch := make(chan bool) stop := make(chan bool) + var once sync.Once every2Seconds := time.Tick(2 * time.Second) - var prevModTime time.Time + + // XXX: use tomb.Tomb to cleanly managed these goroutines. + + stopAndClose := func() { + go func() { + close(ch) + stop <- true + }() + } + go func() { for { select { @@ -117,17 +133,24 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool { default: } - time.Sleep(250 * time.Millisecond) + time.Sleep(POLL_DURATION) fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { - // below goroutine (every2Seconds) will catch up - // eventually and stop us. + once.Do(stopAndClose) continue } + /// XXX: do not panic here. panic(err) } + // File got moved/rename within POLL_DURATION? + if !os.SameFile(origFi, fi) { + once.Do(stopAndClose) + continue + } + + // If the file was changed since last check, notify. modTime := fi.ModTime() if modTime != prevModTime { prevModTime = modTime @@ -145,8 +168,7 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool { case <-every2Seconds: // XXX: not using file descriptor as per contract. if _, err := os.Stat(fw.Filename); os.IsNotExist(err) { - stop <- true - close(ch) + once.Do(stopAndClose) return } } @@ -155,3 +177,7 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool { return ch } + +func init() { + POLL_DURATION = 250 * time.Millisecond +}