Merge pull request #3 from srid/bug99126_rotation_when_polling

Make tail work with file rotation when using polling file watcher
This commit is contained in:
Sridhar Ratnakumar 2013-05-28 11:07:12 -07:00
commit ad34bda357
4 changed files with 100 additions and 31 deletions

7
CHANGES.md Normal file
View File

@ -0,0 +1,7 @@
# May, 2013
* Recognize deletions/renames when using polling file watcher (PR #1)
# Feb, 2013
* Initial open source release

12
tail.go
View File

@ -173,12 +173,20 @@ func (tail *Tail) tailFileSync() {
// `tail.watcher` implementation (inotify or polling). // `tail.watcher` implementation (inotify or polling).
if err == io.EOF { if err == io.EOF {
if changes == nil { if changes == nil {
changes = tail.watcher.ChangeEvents() st, err := tail.file.Stat()
if err != nil {
tail.close()
tail.Kill(err)
return
}
changes = tail.watcher.ChangeEvents(st)
} }
select { select {
case _, ok := <-changes: case _, ok := <-changes:
if !ok { if !ok {
changes = nil // XXX: how to kill changes' goroutine?
// File got deleted/renamed // File got deleted/renamed
if tail.ReOpen { if tail.ReOpen {
// TODO: no logging in a library? // TODO: no logging in a library?
@ -191,7 +199,7 @@ func (tail *Tail) tailFileSync() {
} }
log.Printf("Successfully reopened %s", tail.Filename) log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file) tail.reader = bufio.NewReader(tail.file)
changes = nil // XXX: how to kill changes' goroutine?
continue continue
} else { } else {
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)

View File

@ -80,10 +80,19 @@ func TestLocationEnd(_t *testing.T) {
tail.Stop() tail.Stop()
} }
func TestReOpen(_t *testing.T) { func _TestReOpen(_t *testing.T, poll bool) {
t := NewTailTest("reopen", _t) var name string
if poll {
name = "reopen-polling"
}else {
name = "reopen-inotify"
}
t := NewTailTest(name, _t)
t.CreateFile("test.txt", "hello\nworld\n") t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, ReOpen: true, Location: -1}) tail := t.StartTail(
"test.txt",
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"}) go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"})
// deletion must trigger reopen // deletion must trigger reopen
@ -91,11 +100,21 @@ func TestReOpen(_t *testing.T) {
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.CreateFile("test.txt", "more\ndata\n") t.CreateFile("test.txt", "more\ndata\n")
if poll {
// Give polling a chance to read the just-written lines (more;
// data), before we recreate the file again below.
<-time.After(POLL_DURATION)
}
// rename must trigger reopen // rename must trigger reopen
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RenameFile("test.txt", "test.txt.rotated") t.RenameFile("test.txt", "test.txt.rotated")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
if poll {
// This time, wait a bit before creating the file to test
// PollingFileWatcher's BlockUntilExists.
<-time.After(POLL_DURATION)
}
t.CreateFile("test.txt", "endofworld") t.CreateFile("test.txt", "endofworld")
// Delete after a reasonable delay, to give tail sufficient time // Delete after a reasonable delay, to give tail sufficient time
@ -106,6 +125,16 @@ func TestReOpen(_t *testing.T) {
tail.Stop() tail.Stop()
} }
// The use of polling file watcher could affect file rotation
// (detected via renames), so test these explicitly.
func TestReOpenWithPoll(_t *testing.T) {
_TestReOpen(_t, true)
}
func TestReOpenWithoutPoll(_t *testing.T) {
_TestReOpen(_t, false)
}
// Test library // Test library
@ -171,7 +200,12 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
for idx, line := range lines { for idx, line := range lines {
tailedLine, ok := <-tail.Lines tailedLine, ok := <-tail.Lines
if !ok { if !ok {
t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) err := tail.Wait()
if err != nil {
t.Fatal("tail ended early with error: %v", err)
}else{
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
}
} }
if tailedLine == nil { if tailedLine == nil {
t.Fatalf("tail.Lines returned nil; not possible") t.Fatalf("tail.Lines returned nil; not possible")

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"sync"
) )
// FileWatcher monitors file-level events. // FileWatcher monitors file-level events.
@ -17,7 +18,7 @@ type FileWatcher interface {
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. // times the file is ready to be read.
ChangeEvents() chan bool ChangeEvents(os.FileInfo) chan bool
} }
// InotifyFileWatcher uses inotify to monitor file changes. // InotifyFileWatcher uses inotify to monitor file changes.
@ -50,7 +51,8 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
return nil return nil
} }
func (fw *InotifyFileWatcher) ChangeEvents() chan bool { // ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool {
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
panic(err) panic(err)
@ -98,17 +100,38 @@ func NewPollingFileWatcher(filename string) *PollingFileWatcher {
return fw return fw
} }
var POLL_DURATION time.Duration
// BlockUntilExists blocks until the file comes into existence. If the
// file already exists, then block until it is created again.
func (fw *PollingFileWatcher) BlockUntilExists() error { func (fw *PollingFileWatcher) BlockUntilExists() error {
panic("not implemented") for {
return nil if _, err := os.Stat(fw.Filename); err == nil {
return nil
}else if !os.IsNotExist(err) {
return err
}
time.Sleep(POLL_DURATION)
}
panic("unreachable")
} }
func (fw *PollingFileWatcher) ChangeEvents() chan bool { func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool {
ch := make(chan bool) ch := make(chan bool)
stop := make(chan bool) stop := make(chan bool)
every2Seconds := time.Tick(2 * time.Second) var once sync.Once
var prevModTime time.Time var prevModTime time.Time
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace
// the panic (below) with tomb's Kill.
stopAndClose := func() {
go func() {
close(ch)
stop <- true
}()
}
go func() { go func() {
for { for {
select { select {
@ -117,17 +140,24 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool {
default: default:
} }
time.Sleep(250 * time.Millisecond) time.Sleep(POLL_DURATION)
fi, err := os.Stat(fw.Filename) fi, err := os.Stat(fw.Filename)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
// below goroutine (every2Seconds) will catch up once.Do(stopAndClose)
// eventually and stop us.
continue continue
} }
/// XXX: do not panic here.
panic(err) panic(err)
} }
// File got moved/rename within POLL_DURATION?
if !os.SameFile(origFi, fi) {
once.Do(stopAndClose)
continue
}
// If the file was changed since last check, notify.
modTime := fi.ModTime() modTime := fi.ModTime()
if modTime != prevModTime { if modTime != prevModTime {
prevModTime = modTime prevModTime = modTime
@ -139,19 +169,9 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool {
} }
}() }()
go func() {
for {
select {
case <-every2Seconds:
// XXX: not using file descriptor as per contract.
if _, err := os.Stat(fw.Filename); os.IsNotExist(err) {
stop <- true
close(ch)
return
}
}
}
}()
return ch return ch
} }
func init() {
POLL_DURATION = 250 * time.Millisecond
}