Merge pull request #7 from srid/copytruncate_reopen_false

Truncated files must be reopened even if ReOpen is false
This commit is contained in:
Sridhar Ratnakumar 2013-05-29 15:26:00 -07:00
commit 4deef2319f
7 changed files with 98 additions and 63 deletions

View File

@ -14,6 +14,7 @@ func args2config() tail.Config {
flag.IntVar(&config.Location, "n", 0, "tail from the last Nth location")
flag.BoolVar(&config.Follow, "f", false, "wait for additional data to be appended to the file")
flag.BoolVar(&config.ReOpen, "F", false, "follow, and track file rename/rotation")
flag.BoolVar(&config.Poll, "p", false, "use polling, instead of inotify")
flag.Parse()
if config.ReOpen {
config.Follow = true

45
tail.go
View File

@ -40,7 +40,7 @@ type Tail struct {
file *os.File
reader *bufio.Reader
watcher watch.FileWatcher
changes chan bool
changes *watch.FileChanges
tomb.Tomb // provides: Done, Kill, Dying
}
@ -163,7 +163,7 @@ func (tail *Tail) tailFileSync() {
// When EOF is reached, wait for more data to become
// available. Wait strategy is based on the `tail.watcher`
// implementation (inotify or polling).
err = tail.waitForChanges()
err := tail.waitForChanges()
if err != nil {
if err != ErrStop {
tail.Kill(err)
@ -196,26 +196,31 @@ func (tail *Tail) waitForChanges() error {
}
select {
case _, ok := <-tail.changes:
if !ok {
tail.changes = nil
// File got deleted/renamed/truncated.
if tail.ReOpen {
// XXX: no logging in a library?
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename)
err := tail.reopen()
if err != nil {
return err
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
return nil
} else {
log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
case <-tail.changes.Modified:
case <-tail.changes.Deleted:
tail.changes = nil
if tail.ReOpen {
// XXX: we must not log from a library.
log.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
return nil
} else {
log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
}
case <-tail.changes.Truncated:
// Always reopen truncated files (Follow is true)
log.Printf("Re-opening truncated file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
return err
}
log.Printf("Successfully reopened truncated %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
return nil
case <-tail.Dying():
return ErrStop
}

View File

@ -143,7 +143,7 @@ func _TestReSeek(_t *testing.T, poll bool) {
t.CreateFile("test.txt", "a really long string goes here\nhello\nworld\n")
tail := t.StartTail(
"test.txt",
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
Config{Follow: true, ReOpen: false, Poll: poll, Location: -1})
go t.VerifyTailOutput(tail, []string{
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"})

42
watch/filechanges.go Normal file
View File

@ -0,0 +1,42 @@
package watch
type FileChanges struct {
Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames
}
func NewFileChanges() *FileChanges {
return &FileChanges{
make(chan bool), make(chan bool), make(chan bool)}
}
func (fc *FileChanges) NotifyModified() {
sendOnlyIfEmpty(fc.Modified)
}
func (fc *FileChanges) NotifyTruncated() {
sendOnlyIfEmpty(fc.Truncated)
}
func (fc *FileChanges) NotifyDeleted() {
sendOnlyIfEmpty(fc.Deleted)
}
func (fc *FileChanges) Close() {
close(fc.Modified)
close(fc.Truncated)
close(fc.Deleted)
}
// sendOnlyIfEmpty sends on a bool channel only if the channel has no
// backlog to be read by other goroutines. This concurrency pattern
// can be used to notify other goroutines if and only if they are
// looking for it (i.e., subsequent notifications can be compressed
// into one).
func sendOnlyIfEmpty(ch chan bool) {
select {
case ch <- true:
default:
}
}

View File

@ -56,7 +56,9 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error {
panic("unreachable")
}
func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool {
func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) *FileChanges {
changes := NewFileChanges()
w, err := fsnotify.NewWatcher()
if err != nil {
panic(err)
@ -66,14 +68,12 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
panic(err)
}
ch := make(chan bool)
fw.Size = fi.Size()
go func() {
defer w.Close()
defer w.RemoveWatch(fw.Filename)
defer close(ch)
defer changes.Close()
for {
prevSize := fw.Size
@ -91,6 +91,7 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
fallthrough
case evt.IsRename():
changes.NotifyDeleted()
return
case evt.IsModify():
@ -102,17 +103,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan boo
fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size {
return
}
// send only if channel is empty.
select {
case ch <- true:
default:
changes.NotifyTruncated()
}else{
changes.NotifyModified()
}
prevSize = fw.Size
}
}
}()
return ch
return changes
}

View File

@ -5,7 +5,6 @@ package watch
import (
"launchpad.net/tomb"
"os"
"sync"
"time"
)
@ -39,33 +38,23 @@ func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error {
panic("unreachable")
}
func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan bool {
ch := make(chan bool)
stop := make(chan bool)
var once sync.Once
func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) *FileChanges {
changes := NewFileChanges()
var prevModTime time.Time
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace
// the panic (below) with tomb's Kill.
stopAndClose := func() {
go func() {
close(ch)
stop <- true
}()
}
fw.Size = origFi.Size()
go func() {
defer changes.Close()
prevSize := fw.Size
for {
select {
case <-stop:
return
case <-t.Dying():
once.Do(stopAndClose)
continue
return
default:
}
@ -73,39 +62,39 @@ func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan
fi, err := os.Stat(fw.Filename)
if err != nil {
if os.IsNotExist(err) {
once.Do(stopAndClose)
continue
// File does not exist (has been deleted).
changes.NotifyDeleted()
return
}
/// XXX: do not panic here.
panic(err)
}
// File got moved/rename within POLL_DURATION?
// File got moved/renamed?
if !os.SameFile(origFi, fi) {
once.Do(stopAndClose)
continue
changes.NotifyDeleted()
return
}
// Was the file truncated?
// File got truncated?
fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size {
once.Do(stopAndClose)
changes.NotifyTruncated()
prevSize = fw.Size
continue
}
prevSize = fw.Size
// If the file was changed since last check, notify.
// File was appended to (changed)?
modTime := fi.ModTime()
if modTime != prevModTime {
prevModTime = modTime
select {
case ch <- true:
default:
}
changes.NotifyModified()
}
}
}()
return ch
return changes
}
func init() {

View File

@ -16,6 +16,6 @@ type FileWatcher interface {
// ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. The channel will be closed
// if the file gets deleted, renamed or truncated.
ChangeEvents(tomb.Tomb, os.FileInfo) chan bool
ChangeEvents(tomb.Tomb, os.FileInfo) *FileChanges
}