diff --git a/CHANGES.md b/CHANGES.md new file mode 100644 index 0000000..9880013 --- /dev/null +++ b/CHANGES.md @@ -0,0 +1,7 @@ +# May, 2013 + +* Recognize deletions/renames when using polling file watcher (PR #1) + +# Feb, 2013 + +* Initial open source release diff --git a/tail.go b/tail.go index 07f7aed..c68d7b2 100644 --- a/tail.go +++ b/tail.go @@ -173,12 +173,20 @@ func (tail *Tail) tailFileSync() { // `tail.watcher` implementation (inotify or polling). if err == io.EOF { if changes == nil { - changes = tail.watcher.ChangeEvents() + st, err := tail.file.Stat() + if err != nil { + tail.close() + tail.Kill(err) + return + } + changes = tail.watcher.ChangeEvents(st) } select { case _, ok := <-changes: if !ok { + changes = nil // XXX: how to kill changes' goroutine? + // File got deleted/renamed if tail.ReOpen { // TODO: no logging in a library? @@ -191,7 +199,7 @@ func (tail *Tail) tailFileSync() { } log.Printf("Successfully reopened %s", tail.Filename) tail.reader = bufio.NewReader(tail.file) - changes = nil // XXX: how to kill changes' goroutine? + continue } else { log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) diff --git a/tail_test.go b/tail_test.go index c6cf53b..56af1e5 100644 --- a/tail_test.go +++ b/tail_test.go @@ -80,10 +80,19 @@ func TestLocationEnd(_t *testing.T) { tail.Stop() } -func TestReOpen(_t *testing.T) { - t := NewTailTest("reopen", _t) +func _TestReOpen(_t *testing.T, poll bool) { + var name string + if poll { + name = "reopen-polling" + }else { + name = "reopen-inotify" + } + t := NewTailTest(name, _t) t.CreateFile("test.txt", "hello\nworld\n") - tail := t.StartTail("test.txt", Config{Follow: true, ReOpen: true, Location: -1}) + tail := t.StartTail( + "test.txt", + Config{Follow: true, ReOpen: true, Poll: poll, Location: -1}) + go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"}) // deletion must trigger reopen @@ -91,21 +100,41 @@ func TestReOpen(_t *testing.T) { t.RemoveFile("test.txt") <-time.After(100 * time.Millisecond) t.CreateFile("test.txt", "more\ndata\n") + if poll { + // Give polling a chance to read the just-written lines (more; + // data), before we recreate the file again below. + <-time.After(POLL_DURATION) + } // rename must trigger reopen <-time.After(100 * time.Millisecond) t.RenameFile("test.txt", "test.txt.rotated") <-time.After(100 * time.Millisecond) + if poll { + // This time, wait a bit before creating the file to test + // PollingFileWatcher's BlockUntilExists. + <-time.After(POLL_DURATION) + } t.CreateFile("test.txt", "endofworld") // Delete after a reasonable delay, to give tail sufficient time // to read all lines. <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") - + tail.Stop() } +// The use of polling file watcher could affect file rotation +// (detected via renames), so test these explicitly. + +func TestReOpenWithPoll(_t *testing.T) { + _TestReOpen(_t, true) +} + +func TestReOpenWithoutPoll(_t *testing.T) { + _TestReOpen(_t, false) +} // Test library @@ -171,7 +200,12 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) { for idx, line := range lines { tailedLine, ok := <-tail.Lines if !ok { - t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) + err := tail.Wait() + if err != nil { + t.Fatal("tail ended early with error: %v", err) + }else{ + t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) + } } if tailedLine == nil { t.Fatalf("tail.Lines returned nil; not possible") diff --git a/watch.go b/watch.go index fba7441..df99e25 100644 --- a/watch.go +++ b/watch.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "time" + "sync" ) // FileWatcher monitors file-level events. @@ -17,7 +18,7 @@ type FileWatcher interface { // ChangeEvents returns a channel of events corresponding to the // times the file is ready to be read. - ChangeEvents() chan bool + ChangeEvents(os.FileInfo) chan bool } // InotifyFileWatcher uses inotify to monitor file changes. @@ -50,7 +51,8 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error { return nil } -func (fw *InotifyFileWatcher) ChangeEvents() chan bool { +// ChangeEvents returns a channel that gets updated when the file is ready to be read. +func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool { w, err := fsnotify.NewWatcher() if err != nil { panic(err) @@ -98,17 +100,38 @@ func NewPollingFileWatcher(filename string) *PollingFileWatcher { return fw } +var POLL_DURATION time.Duration + +// BlockUntilExists blocks until the file comes into existence. If the +// file already exists, then block until it is created again. func (fw *PollingFileWatcher) BlockUntilExists() error { - panic("not implemented") - return nil + for { + if _, err := os.Stat(fw.Filename); err == nil { + return nil + }else if !os.IsNotExist(err) { + return err + } + time.Sleep(POLL_DURATION) + } + panic("unreachable") } -func (fw *PollingFileWatcher) ChangeEvents() chan bool { +func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { ch := make(chan bool) stop := make(chan bool) - every2Seconds := time.Tick(2 * time.Second) - + var once sync.Once var prevModTime time.Time + + // XXX: use tomb.Tomb to cleanly manage these goroutines. replace + // the panic (below) with tomb's Kill. + + stopAndClose := func() { + go func() { + close(ch) + stop <- true + }() + } + go func() { for { select { @@ -117,17 +140,24 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool { default: } - time.Sleep(250 * time.Millisecond) + time.Sleep(POLL_DURATION) fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { - // below goroutine (every2Seconds) will catch up - // eventually and stop us. + once.Do(stopAndClose) continue } + /// XXX: do not panic here. panic(err) } + // File got moved/rename within POLL_DURATION? + if !os.SameFile(origFi, fi) { + once.Do(stopAndClose) + continue + } + + // If the file was changed since last check, notify. modTime := fi.ModTime() if modTime != prevModTime { prevModTime = modTime @@ -139,19 +169,9 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool { } }() - go func() { - for { - select { - case <-every2Seconds: - // XXX: not using file descriptor as per contract. - if _, err := os.Stat(fw.Filename); os.IsNotExist(err) { - stop <- true - close(ch) - return - } - } - } - }() - return ch } + +func init() { + POLL_DURATION = 250 * time.Millisecond +}