diff --git a/CHANGES.md b/CHANGES.md index 9880013..95c5f6f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # May, 2013 * Recognize deletions/renames when using polling file watcher (PR #1) +* Detect file truncation +* Fix potential race condition when reopening the file (issue 5) # Feb, 2013 diff --git a/tail.go b/tail.go index c68d7b2..c8647aa 100644 --- a/tail.go +++ b/tail.go @@ -13,8 +13,8 @@ import ( ) type Line struct { - Text string - Time time.Time + Text string + Time time.Time } // Tail configuration @@ -102,8 +102,7 @@ func (tail *Tail) reopen() error { if err != nil { if os.IsNotExist(err) { log.Printf("Waiting for %s to appear...", tail.Filename) - err := tail.watcher.BlockUntilExists() - if err != nil { + if err := tail.watcher.BlockUntilExists(); err != nil { return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) } continue @@ -157,7 +156,7 @@ func (tail *Tail) tailFileSync() { for _, line := range partitionString(string(line), tail.MaxLineSize) { tail.Lines <- &Line{line, now} } - }else{ + } else { tail.Lines <- &Line{string(line), now} } } @@ -187,10 +186,10 @@ func (tail *Tail) tailFileSync() { if !ok { changes = nil // XXX: how to kill changes' goroutine? - // File got deleted/renamed + // File got deleted/renamed/truncated. if tail.ReOpen { // TODO: no logging in a library? - log.Printf("Re-opening moved/deleted file %s ...", tail.Filename) + log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) err := tail.reopen() if err != nil { tail.close() @@ -199,7 +198,7 @@ func (tail *Tail) tailFileSync() { } log.Printf("Successfully reopened %s", tail.Filename) tail.reader = bufio.NewReader(tail.file) - + continue } else { log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) @@ -230,7 +229,7 @@ func partitionString(s string, chunkSize int) []string { panic("invalid chunkSize") } length := len(s) - chunks := 1 + length/chunkSize + chunks := 1 + length/chunkSize start := 0 end := chunkSize parts := make([]string, 0, chunks) diff --git a/tail_test.go b/tail_test.go index 56af1e5..cc1af62 100644 --- a/tail_test.go +++ b/tail_test.go @@ -14,6 +14,7 @@ import ( ) func init() { + // Clear the temporary test directory err := os.RemoveAll(".test") if err != nil { panic(err) @@ -84,7 +85,7 @@ func _TestReOpen(_t *testing.T, poll bool) { var name string if poll { name = "reopen-polling" - }else { + } else { name = "reopen-inotify" } t := NewTailTest(name, _t) @@ -92,7 +93,7 @@ func _TestReOpen(_t *testing.T, poll bool) { tail := t.StartTail( "test.txt", Config{Follow: true, ReOpen: true, Poll: poll, Location: -1}) - + go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"}) // deletion must trigger reopen @@ -122,18 +123,71 @@ func _TestReOpen(_t *testing.T, poll bool) { <-time.After(100 * time.Millisecond) t.RemoveFile("test.txt") + println("Stopping (REOPEN)...") tail.Stop() } // The use of polling file watcher could affect file rotation // (detected via renames), so test these explicitly. -func TestReOpenWithPoll(_t *testing.T) { +func TestReOpenInotify(_t *testing.T) { + _TestReOpen(_t, false) +} + +func TestReOpenPolling(_t *testing.T) { _TestReOpen(_t, true) } -func TestReOpenWithoutPoll(_t *testing.T) { - _TestReOpen(_t, false) +func _TestReSeek(_t *testing.T, poll bool) { + var name string + if poll { + name = "reseek-polling" + } else { + name = "reseek-inotify" + } + t := NewTailTest(name, _t) + t.CreateFile("test.txt", "a really long string goes here\nhello\nworld\n") + tail := t.StartTail( + "test.txt", + Config{Follow: true, ReOpen: true, Poll: poll, Location: -1}) + + go t.VerifyTailOutput(tail, []string{ + "a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"}) + + // truncate now + <-time.After(100 * time.Millisecond) + if poll { + // Give polling a chance to read the just-written lines (more; + // data), before we truncate the file again below. + <-time.After(POLL_DURATION) + } + println("truncating..") + t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n") + // XXX: is this required for this test function? + if poll { + // Give polling a chance to read the just-written lines (more; + // data), before we recreate the file again below. + <-time.After(POLL_DURATION) + } + + // Delete after a reasonable delay, to give tail sufficient time + // to read all lines. + <-time.After(100 * time.Millisecond) + t.RemoveFile("test.txt") + + println("Stopping (RESEEK)...") + tail.Stop() +} + +// The use of polling file watcher could affect file rotation +// (detected via renames), so test these explicitly. + +func TestReSeekInotify(_t *testing.T) { + _TestReSeek(_t, false) +} + +func TestReSeekPolling(_t *testing.T) { + _TestReSeek(_t, true) } // Test library @@ -150,6 +204,10 @@ func NewTailTest(name string, t *testing.T) TailTest { if err != nil { tt.Fatal(err) } + + // Use a smaller poll duration for faster test runs. + POLL_DURATION = 25 * time.Millisecond + return tt } @@ -188,6 +246,18 @@ func (t TailTest) AppendFile(name string, contents string) { } } +func (t TailTest) TruncateFile(name string, contents string) { + f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + t.Fatal(err) + } + defer f.Close() + _, err = f.WriteString(contents) + if err != nil { + t.Fatal(err) + } +} + func (t TailTest) StartTail(name string, config Config) *Tail { tail, err := TailFile(t.path+"/"+name, config) if err != nil { @@ -203,7 +273,7 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) { err := tail.Wait() if err != nil { t.Fatal("tail ended early with error: %v", err) - }else{ + } else { t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) } } @@ -211,7 +281,8 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) { t.Fatalf("tail.Lines returned nil; not possible") } if tailedLine.Text != line { - t.Fatalf("mismatch; %s != %s", tailedLine.Text, line) + t.Fatalf("mismatch; %s (actual) != %s (expected)", + tailedLine.Text, line) } } line, ok := <-tail.Lines diff --git a/watch.go b/watch.go index df99e25..1d8c4c7 100644 --- a/watch.go +++ b/watch.go @@ -6,14 +6,14 @@ import ( "github.com/howeyc/fsnotify" "os" "path/filepath" - "time" "sync" + "time" ) // FileWatcher monitors file-level events. type FileWatcher interface { // BlockUntilExists blocks until the missing file comes into - // existence. If the file already exists, block until it is recreated. + // existence. If the file already exists, returns immediately. BlockUntilExists() error // ChangeEvents returns a channel of events corresponding to the @@ -24,10 +24,11 @@ type FileWatcher interface { // InotifyFileWatcher uses inotify to monitor file changes. type InotifyFileWatcher struct { Filename string + Size int64 } func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { - fw := &InotifyFileWatcher{filename} + fw := &InotifyFileWatcher{filename, 0} return fw } @@ -37,11 +38,23 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error { return err } defer w.Close() - err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE) + + dirname := filepath.Dir(fw.Filename) + + // Watch for new files to be created in the parent directory. + err = w.WatchFlags(dirname, fsnotify.FSN_CREATE) if err != nil { return err } defer w.RemoveWatch(filepath.Dir(fw.Filename)) + + // Do a real check now as the file might have been created before + // calling `WatchFlags` above. + if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) { + // file exists, or stat returned an error. + return err + } + for { evt := <-w.Event if evt.Name == fw.Filename { @@ -52,7 +65,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error { } // ChangeEvents returns a channel that gets updated when the file is ready to be read. -func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool { +func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { w, err := fsnotify.NewWatcher() if err != nil { panic(err) @@ -64,20 +77,36 @@ func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool { ch := make(chan bool) + fw.Size = fi.Size() + go func() { + defer w.Close() + defer w.RemoveWatch(fw.Filename) + defer close(ch) + for { + prevSize := fw.Size + evt := <-w.Event switch { case evt.IsDelete(): fallthrough case evt.IsRename(): - close(ch) - w.RemoveWatch(fw.Filename) - w.Close() return case evt.IsModify(): + fi, err := os.Stat(fw.Filename) + if err != nil { + // XXX: no panic here + panic(err) + } + fw.Size = fi.Size() + + if prevSize > 0 && prevSize > fw.Size { + return + } + // send only if channel is empty. select { case ch <- true: @@ -93,22 +122,21 @@ func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool { // PollingFileWatcher polls the file for changes. type PollingFileWatcher struct { Filename string + Size int64 } func NewPollingFileWatcher(filename string) *PollingFileWatcher { - fw := &PollingFileWatcher{filename} + fw := &PollingFileWatcher{filename, 0} return fw } var POLL_DURATION time.Duration -// BlockUntilExists blocks until the file comes into existence. If the -// file already exists, then block until it is created again. func (fw *PollingFileWatcher) BlockUntilExists() error { for { if _, err := os.Stat(fw.Filename); err == nil { return nil - }else if !os.IsNotExist(err) { + } else if !os.IsNotExist(err) { return err } time.Sleep(POLL_DURATION) @@ -131,8 +159,11 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { stop <- true }() } - + + fw.Size = origFi.Size() + go func() { + prevSize := fw.Size for { select { case <-stop: @@ -157,6 +188,13 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { continue } + // Was the file truncated? + fw.Size = fi.Size() + if prevSize > 0 && prevSize > fw.Size { + once.Do(stopAndClose) + continue + } + // If the file was changed since last check, notify. modTime := fi.ModTime() if modTime != prevModTime {