PollingFileWatcher.ChangeEvents must detect file deletion/rename

This commit is contained in:
Sridhar Ratnakumar 2013-05-27 15:21:02 -07:00
parent e895d422e1
commit c5073c7f26
3 changed files with 68 additions and 14 deletions

12
tail.go
View File

@ -173,12 +173,20 @@ func (tail *Tail) tailFileSync() {
// `tail.watcher` implementation (inotify or polling). // `tail.watcher` implementation (inotify or polling).
if err == io.EOF { if err == io.EOF {
if changes == nil { if changes == nil {
changes = tail.watcher.ChangeEvents() st, err := tail.file.Stat()
if err != nil {
tail.close()
tail.Kill(err)
return
}
changes = tail.watcher.ChangeEvents(st)
} }
select { select {
case _, ok := <-changes: case _, ok := <-changes:
if !ok { if !ok {
changes = nil // XXX: how to kill changes' goroutine?
// File got deleted/renamed // File got deleted/renamed
if tail.ReOpen { if tail.ReOpen {
// TODO: no logging in a library? // TODO: no logging in a library?
@ -191,7 +199,7 @@ func (tail *Tail) tailFileSync() {
} }
log.Printf("Successfully reopened %s", tail.Filename) log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file) tail.reader = bufio.NewReader(tail.file)
changes = nil // XXX: how to kill changes' goroutine?
continue continue
} else { } else {
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)

View File

@ -81,11 +81,18 @@ func TestLocationEnd(_t *testing.T) {
} }
func _TestReOpen(_t *testing.T, poll bool) { func _TestReOpen(_t *testing.T, poll bool) {
t := NewTailTest("reopen", _t) var name string
if poll {
name = "reopen-polling"
}else {
name = "reopen-inotify"
}
t := NewTailTest(name, _t)
t.CreateFile("test.txt", "hello\nworld\n") t.CreateFile("test.txt", "hello\nworld\n")
tail := t.StartTail( tail := t.StartTail(
"test.txt", "test.txt",
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1}) Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"}) go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"})
// deletion must trigger reopen // deletion must trigger reopen
@ -93,18 +100,26 @@ func _TestReOpen(_t *testing.T, poll bool) {
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.CreateFile("test.txt", "more\ndata\n") t.CreateFile("test.txt", "more\ndata\n")
if poll {
<-time.After(POLL_DURATION)
}
// rename must trigger reopen // rename must trigger reopen
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
println("going to rename")
t.RenameFile("test.txt", "test.txt.rotated") t.RenameFile("test.txt", "test.txt.rotated")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.CreateFile("test.txt", "endofworld") t.CreateFile("test.txt", "endofworld")
if poll {
<-time.After(POLL_DURATION)
}
// Delete after a reasonable delay, to give tail sufficient time // Delete after a reasonable delay, to give tail sufficient time
// to read all lines. // to read all lines.
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
println("Stopping tail")
tail.Stop() tail.Stop()
} }
@ -183,7 +198,12 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
for idx, line := range lines { for idx, line := range lines {
tailedLine, ok := <-tail.Lines tailedLine, ok := <-tail.Lines
if !ok { if !ok {
t.Fatalf("tail ended early; expecting more: %v", lines[idx:]) err := tail.Wait()
if err != nil {
t.Fatal("tail ended early with error: %v", err)
}else{
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
}
} }
if tailedLine == nil { if tailedLine == nil {
t.Fatalf("tail.Lines returned nil; not possible") t.Fatalf("tail.Lines returned nil; not possible")

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"sync"
) )
// FileWatcher monitors file-level events. // FileWatcher monitors file-level events.
@ -17,7 +18,7 @@ type FileWatcher interface {
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. // times the file is ready to be read.
ChangeEvents() chan bool ChangeEvents(os.FileInfo) chan bool
} }
// InotifyFileWatcher uses inotify to monitor file changes. // InotifyFileWatcher uses inotify to monitor file changes.
@ -50,7 +51,8 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
return nil return nil
} }
func (fw *InotifyFileWatcher) ChangeEvents() chan bool { // ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool {
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
panic(err) panic(err)
@ -98,17 +100,31 @@ func NewPollingFileWatcher(filename string) *PollingFileWatcher {
return fw return fw
} }
var POLL_DURATION time.Duration
// BlockUntilExists blocks until the file comes into existence. If the
// file already exists, then block until it is created again.
func (fw *PollingFileWatcher) BlockUntilExists() error { func (fw *PollingFileWatcher) BlockUntilExists() error {
panic("not implemented") panic("not implemented")
return nil return nil
} }
func (fw *PollingFileWatcher) ChangeEvents() chan bool { func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool {
ch := make(chan bool) ch := make(chan bool)
stop := make(chan bool) stop := make(chan bool)
var once sync.Once
every2Seconds := time.Tick(2 * time.Second) every2Seconds := time.Tick(2 * time.Second)
var prevModTime time.Time var prevModTime time.Time
// XXX: use tomb.Tomb to cleanly managed these goroutines.
stopAndClose := func() {
go func() {
close(ch)
stop <- true
}()
}
go func() { go func() {
for { for {
select { select {
@ -117,17 +133,24 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool {
default: default:
} }
time.Sleep(250 * time.Millisecond) time.Sleep(POLL_DURATION)
fi, err := os.Stat(fw.Filename) fi, err := os.Stat(fw.Filename)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
// below goroutine (every2Seconds) will catch up once.Do(stopAndClose)
// eventually and stop us.
continue continue
} }
/// XXX: do not panic here.
panic(err) panic(err)
} }
// File got moved/rename within POLL_DURATION?
if !os.SameFile(origFi, fi) {
once.Do(stopAndClose)
continue
}
// If the file was changed since last check, notify.
modTime := fi.ModTime() modTime := fi.ModTime()
if modTime != prevModTime { if modTime != prevModTime {
prevModTime = modTime prevModTime = modTime
@ -145,8 +168,7 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool {
case <-every2Seconds: case <-every2Seconds:
// XXX: not using file descriptor as per contract. // XXX: not using file descriptor as per contract.
if _, err := os.Stat(fw.Filename); os.IsNotExist(err) { if _, err := os.Stat(fw.Filename); os.IsNotExist(err) {
stop <- true once.Do(stopAndClose)
close(ch)
return return
} }
} }
@ -155,3 +177,7 @@ func (fw *PollingFileWatcher) ChangeEvents() chan bool {
return ch return ch
} }
func init() {
POLL_DURATION = 250 * time.Millisecond
}