copytruncate should work even if ReOpen is False.

at least, `tail -f` (not `tail -F` which is analogous to ReOpen)
reopens truncated files.

this change introduces the FileChanges struct to abstract the change
notifications for file changes, deletions and truncations.
This commit is contained in:
Sridhar Ratnakumar 2013-05-29 14:32:59 -07:00
parent d3c80d385d
commit 499e541b19
5 changed files with 93 additions and 62 deletions

25
tail.go
View File

@ -40,7 +40,7 @@ type Tail struct {
file *os.File file *os.File
reader *bufio.Reader reader *bufio.Reader
watcher watch.FileWatcher watcher watch.FileWatcher
changes chan bool changes *watch.FileChanges
tomb.Tomb // provides: Done, Kill, Dying tomb.Tomb // provides: Done, Kill, Dying
} }
@ -163,7 +163,7 @@ func (tail *Tail) tailFileSync() {
// When EOF is reached, wait for more data to become // When EOF is reached, wait for more data to become
// available. Wait strategy is based on the `tail.watcher` // available. Wait strategy is based on the `tail.watcher`
// implementation (inotify or polling). // implementation (inotify or polling).
err = tail.waitForChanges() err := tail.waitForChanges()
if err != nil { if err != nil {
if err != ErrStop { if err != ErrStop {
tail.Kill(err) tail.Kill(err)
@ -196,16 +196,13 @@ func (tail *Tail) waitForChanges() error {
} }
select { select {
case _, ok := <-tail.changes: case <-tail.changes.Modified:
if !ok { case <-tail.changes.Deleted:
tail.changes = nil tail.changes = nil
// File got deleted/renamed/truncated.
if tail.ReOpen { if tail.ReOpen {
// XXX: no logging in a library? // XXX: we must not log from a library.
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) log.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
err := tail.reopen() if err := tail.reopen(); err != nil {
if err != nil {
return err return err
} }
log.Printf("Successfully reopened %s", tail.Filename) log.Printf("Successfully reopened %s", tail.Filename)
@ -215,7 +212,15 @@ func (tail *Tail) waitForChanges() error {
log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop return ErrStop
} }
case <-tail.changes.Truncated:
// Always reopen truncated files (Follow is true)
log.Printf("Re-opening truncated file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
} }
log.Printf("Successfully reopened truncated %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
return nil
case <-tail.Dying(): case <-tail.Dying():
return ErrStop return ErrStop
} }

42
watch/filechanges.go Normal file
View File

@ -0,0 +1,42 @@
package watch
type FileChanges struct {
Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames
}
func NewFileChanges() *FileChanges {
return &FileChanges{
make(chan bool), make(chan bool), make(chan bool)}
}
func (fc *FileChanges) NotifyModified() {
sendOnlyIfEmpty(fc.Modified)
}
func (fc *FileChanges) NotifyTruncated() {
sendOnlyIfEmpty(fc.Truncated)
}
func (fc *FileChanges) NotifyDeleted() {
sendOnlyIfEmpty(fc.Deleted)
}
func (fc *FileChanges) Close() {
close(fc.Modified)
close(fc.Truncated)
close(fc.Deleted)
}
// sendOnlyIfEmpty sends on a bool channel only if the channel has no
// backlog to be read by other goroutines. This concurrency pattern
// can be used to notify other goroutines if and only if they are
// looking for it (i.e., subsequent notifications can be compressed
// into one).
func sendOnlyIfEmpty(ch chan bool) {
select {
case ch <- true:
default:
}
}

View File

@ -56,7 +56,9 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error {
panic("unreachable") panic("unreachable")
} }
func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool { func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) *FileChanges {
changes := NewFileChanges()
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
panic(err) panic(err)
@ -66,14 +68,12 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
panic(err) panic(err)
} }
ch := make(chan bool)
fw.Size = fi.Size() fw.Size = fi.Size()
go func() { go func() {
defer w.Close() defer w.Close()
defer w.RemoveWatch(fw.Filename) defer w.RemoveWatch(fw.Filename)
defer close(ch) defer changes.Close()
for { for {
prevSize := fw.Size prevSize := fw.Size
@ -91,6 +91,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
fallthrough fallthrough
case evt.IsRename(): case evt.IsRename():
changes.NotifyDeleted()
return return
case evt.IsModify(): case evt.IsModify():
@ -102,17 +103,13 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
fw.Size = fi.Size() fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size { if prevSize > 0 && prevSize > fw.Size {
return changes.NotifyTruncated()
} }else{
changes.NotifyModified()
// send only if channel is empty.
select {
case ch <- true:
default:
} }
} }
} }
}() }()
return ch return changes
} }

View File

@ -5,7 +5,6 @@ package watch
import ( import (
"launchpad.net/tomb" "launchpad.net/tomb"
"os" "os"
"sync"
"time" "time"
) )
@ -39,33 +38,23 @@ func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error {
panic("unreachable") panic("unreachable")
} }
func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan bool { func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) *FileChanges {
ch := make(chan bool) changes := NewFileChanges()
stop := make(chan bool)
var once sync.Once
var prevModTime time.Time var prevModTime time.Time
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace // XXX: use tomb.Tomb to cleanly manage these goroutines. replace
// the panic (below) with tomb's Kill. // the panic (below) with tomb's Kill.
stopAndClose := func() {
go func() {
close(ch)
stop <- true
}()
}
fw.Size = origFi.Size() fw.Size = origFi.Size()
go func() { go func() {
defer changes.Close()
prevSize := fw.Size prevSize := fw.Size
for { for {
select { select {
case <-stop:
return
case <-t.Dying(): case <-t.Dying():
once.Do(stopAndClose) return
continue
default: default:
} }
@ -73,39 +62,37 @@ func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan
fi, err := os.Stat(fw.Filename) fi, err := os.Stat(fw.Filename)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
once.Do(stopAndClose) // File does not exist (has been deleted).
continue changes.NotifyDeleted()
return
} }
/// XXX: do not panic here. /// XXX: do not panic here.
panic(err) panic(err)
} }
// File got moved/rename within POLL_DURATION? // File got moved/renamed?
if !os.SameFile(origFi, fi) { if !os.SameFile(origFi, fi) {
once.Do(stopAndClose) changes.NotifyDeleted()
continue return
} }
// Was the file truncated? // File got truncated?
fw.Size = fi.Size() fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size { if prevSize > 0 && prevSize > fw.Size {
once.Do(stopAndClose) changes.NotifyTruncated()
continue continue
} }
// If the file was changed since last check, notify. // File was appended to (changed)?
modTime := fi.ModTime() modTime := fi.ModTime()
if modTime != prevModTime { if modTime != prevModTime {
prevModTime = modTime prevModTime = modTime
select { changes.NotifyModified()
case ch <- true:
default:
}
} }
} }
}() }()
return ch return changes
} }
func init() { func init() {

View File

@ -16,6 +16,6 @@ type FileWatcher interface {
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. The channel will be closed // times the file is ready to be read. The channel will be closed
// if the file gets deleted, renamed or truncated. // if the file gets deleted, renamed or truncated.
ChangeEvents(tomb.Tomb, os.FileInfo) chan bool ChangeEvents(tomb.Tomb, os.FileInfo) *FileChanges
} }