Merge pull request #6 from srid/bug99139_copytruncate
copytruncate support, truncation detection for both inotify and polling file watchers along with test cases, plus a fix for issue #5.
This commit is contained in:
commit
b664e9fc9d
|
@ -1,6 +1,8 @@
|
||||||
# May, 2013
|
# May, 2013
|
||||||
|
|
||||||
* Recognize deletions/renames when using polling file watcher (PR #1)
|
* Recognize deletions/renames when using polling file watcher (PR #1)
|
||||||
|
* Detect file truncation
|
||||||
|
* Fix potential race condition when reopening the file (issue 5)
|
||||||
|
|
||||||
# Feb, 2013
|
# Feb, 2013
|
||||||
|
|
||||||
|
|
17
tail.go
17
tail.go
|
@ -13,8 +13,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Line struct {
|
type Line struct {
|
||||||
Text string
|
Text string
|
||||||
Time time.Time
|
Time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tail configuration
|
// Tail configuration
|
||||||
|
@ -102,8 +102,7 @@ func (tail *Tail) reopen() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
log.Printf("Waiting for %s to appear...", tail.Filename)
|
log.Printf("Waiting for %s to appear...", tail.Filename)
|
||||||
err := tail.watcher.BlockUntilExists()
|
if err := tail.watcher.BlockUntilExists(); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
|
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -157,7 +156,7 @@ func (tail *Tail) tailFileSync() {
|
||||||
for _, line := range partitionString(string(line), tail.MaxLineSize) {
|
for _, line := range partitionString(string(line), tail.MaxLineSize) {
|
||||||
tail.Lines <- &Line{line, now}
|
tail.Lines <- &Line{line, now}
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
tail.Lines <- &Line{string(line), now}
|
tail.Lines <- &Line{string(line), now}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -187,10 +186,10 @@ func (tail *Tail) tailFileSync() {
|
||||||
if !ok {
|
if !ok {
|
||||||
changes = nil // XXX: how to kill changes' goroutine?
|
changes = nil // XXX: how to kill changes' goroutine?
|
||||||
|
|
||||||
// File got deleted/renamed
|
// File got deleted/renamed/truncated.
|
||||||
if tail.ReOpen {
|
if tail.ReOpen {
|
||||||
// TODO: no logging in a library?
|
// TODO: no logging in a library?
|
||||||
log.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
|
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename)
|
||||||
err := tail.reopen()
|
err := tail.reopen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tail.close()
|
tail.close()
|
||||||
|
@ -199,7 +198,7 @@ func (tail *Tail) tailFileSync() {
|
||||||
}
|
}
|
||||||
log.Printf("Successfully reopened %s", tail.Filename)
|
log.Printf("Successfully reopened %s", tail.Filename)
|
||||||
tail.reader = bufio.NewReader(tail.file)
|
tail.reader = bufio.NewReader(tail.file)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)
|
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)
|
||||||
|
@ -230,7 +229,7 @@ func partitionString(s string, chunkSize int) []string {
|
||||||
panic("invalid chunkSize")
|
panic("invalid chunkSize")
|
||||||
}
|
}
|
||||||
length := len(s)
|
length := len(s)
|
||||||
chunks := 1 + length/chunkSize
|
chunks := 1 + length/chunkSize
|
||||||
start := 0
|
start := 0
|
||||||
end := chunkSize
|
end := chunkSize
|
||||||
parts := make([]string, 0, chunks)
|
parts := make([]string, 0, chunks)
|
||||||
|
|
85
tail_test.go
85
tail_test.go
|
@ -14,6 +14,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
// Clear the temporary test directory
|
||||||
err := os.RemoveAll(".test")
|
err := os.RemoveAll(".test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -84,7 +85,7 @@ func _TestReOpen(_t *testing.T, poll bool) {
|
||||||
var name string
|
var name string
|
||||||
if poll {
|
if poll {
|
||||||
name = "reopen-polling"
|
name = "reopen-polling"
|
||||||
}else {
|
} else {
|
||||||
name = "reopen-inotify"
|
name = "reopen-inotify"
|
||||||
}
|
}
|
||||||
t := NewTailTest(name, _t)
|
t := NewTailTest(name, _t)
|
||||||
|
@ -92,7 +93,7 @@ func _TestReOpen(_t *testing.T, poll bool) {
|
||||||
tail := t.StartTail(
|
tail := t.StartTail(
|
||||||
"test.txt",
|
"test.txt",
|
||||||
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
|
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
|
||||||
|
|
||||||
go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"})
|
go t.VerifyTailOutput(tail, []string{"hello", "world", "more", "data", "endofworld"})
|
||||||
|
|
||||||
// deletion must trigger reopen
|
// deletion must trigger reopen
|
||||||
|
@ -122,18 +123,71 @@ func _TestReOpen(_t *testing.T, poll bool) {
|
||||||
<-time.After(100 * time.Millisecond)
|
<-time.After(100 * time.Millisecond)
|
||||||
t.RemoveFile("test.txt")
|
t.RemoveFile("test.txt")
|
||||||
|
|
||||||
|
println("Stopping (REOPEN)...")
|
||||||
tail.Stop()
|
tail.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// The use of polling file watcher could affect file rotation
|
// The use of polling file watcher could affect file rotation
|
||||||
// (detected via renames), so test these explicitly.
|
// (detected via renames), so test these explicitly.
|
||||||
|
|
||||||
func TestReOpenWithPoll(_t *testing.T) {
|
func TestReOpenInotify(_t *testing.T) {
|
||||||
|
_TestReOpen(_t, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReOpenPolling(_t *testing.T) {
|
||||||
_TestReOpen(_t, true)
|
_TestReOpen(_t, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReOpenWithoutPoll(_t *testing.T) {
|
func _TestReSeek(_t *testing.T, poll bool) {
|
||||||
_TestReOpen(_t, false)
|
var name string
|
||||||
|
if poll {
|
||||||
|
name = "reseek-polling"
|
||||||
|
} else {
|
||||||
|
name = "reseek-inotify"
|
||||||
|
}
|
||||||
|
t := NewTailTest(name, _t)
|
||||||
|
t.CreateFile("test.txt", "a really long string goes here\nhello\nworld\n")
|
||||||
|
tail := t.StartTail(
|
||||||
|
"test.txt",
|
||||||
|
Config{Follow: true, ReOpen: true, Poll: poll, Location: -1})
|
||||||
|
|
||||||
|
go t.VerifyTailOutput(tail, []string{
|
||||||
|
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"})
|
||||||
|
|
||||||
|
// truncate now
|
||||||
|
<-time.After(100 * time.Millisecond)
|
||||||
|
if poll {
|
||||||
|
// Give polling a chance to read the just-written lines (more;
|
||||||
|
// data), before we truncate the file again below.
|
||||||
|
<-time.After(POLL_DURATION)
|
||||||
|
}
|
||||||
|
println("truncating..")
|
||||||
|
t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n")
|
||||||
|
// XXX: is this required for this test function?
|
||||||
|
if poll {
|
||||||
|
// Give polling a chance to read the just-written lines (more;
|
||||||
|
// data), before we recreate the file again below.
|
||||||
|
<-time.After(POLL_DURATION)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete after a reasonable delay, to give tail sufficient time
|
||||||
|
// to read all lines.
|
||||||
|
<-time.After(100 * time.Millisecond)
|
||||||
|
t.RemoveFile("test.txt")
|
||||||
|
|
||||||
|
println("Stopping (RESEEK)...")
|
||||||
|
tail.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// The use of polling file watcher could affect file rotation
|
||||||
|
// (detected via renames), so test these explicitly.
|
||||||
|
|
||||||
|
func TestReSeekInotify(_t *testing.T) {
|
||||||
|
_TestReSeek(_t, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReSeekPolling(_t *testing.T) {
|
||||||
|
_TestReSeek(_t, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test library
|
// Test library
|
||||||
|
@ -150,6 +204,10 @@ func NewTailTest(name string, t *testing.T) TailTest {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tt.Fatal(err)
|
tt.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use a smaller poll duration for faster test runs.
|
||||||
|
POLL_DURATION = 25 * time.Millisecond
|
||||||
|
|
||||||
return tt
|
return tt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,6 +246,18 @@ func (t TailTest) AppendFile(name string, contents string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t TailTest) TruncateFile(name string, contents string) {
|
||||||
|
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
_, err = f.WriteString(contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t TailTest) StartTail(name string, config Config) *Tail {
|
func (t TailTest) StartTail(name string, config Config) *Tail {
|
||||||
tail, err := TailFile(t.path+"/"+name, config)
|
tail, err := TailFile(t.path+"/"+name, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -203,7 +273,7 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
|
||||||
err := tail.Wait()
|
err := tail.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("tail ended early with error: %v", err)
|
t.Fatal("tail ended early with error: %v", err)
|
||||||
}else{
|
} else {
|
||||||
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
|
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -211,7 +281,8 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
|
||||||
t.Fatalf("tail.Lines returned nil; not possible")
|
t.Fatalf("tail.Lines returned nil; not possible")
|
||||||
}
|
}
|
||||||
if tailedLine.Text != line {
|
if tailedLine.Text != line {
|
||||||
t.Fatalf("mismatch; %s != %s", tailedLine.Text, line)
|
t.Fatalf("mismatch; %s (actual) != %s (expected)",
|
||||||
|
tailedLine.Text, line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
line, ok := <-tail.Lines
|
line, ok := <-tail.Lines
|
||||||
|
|
64
watch.go
64
watch.go
|
@ -6,14 +6,14 @@ import (
|
||||||
"github.com/howeyc/fsnotify"
|
"github.com/howeyc/fsnotify"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileWatcher monitors file-level events.
|
// FileWatcher monitors file-level events.
|
||||||
type FileWatcher interface {
|
type FileWatcher interface {
|
||||||
// BlockUntilExists blocks until the missing file comes into
|
// BlockUntilExists blocks until the missing file comes into
|
||||||
// existence. If the file already exists, block until it is recreated.
|
// existence. If the file already exists, returns immediately.
|
||||||
BlockUntilExists() error
|
BlockUntilExists() error
|
||||||
|
|
||||||
// ChangeEvents returns a channel of events corresponding to the
|
// ChangeEvents returns a channel of events corresponding to the
|
||||||
|
@ -24,10 +24,11 @@ type FileWatcher interface {
|
||||||
// InotifyFileWatcher uses inotify to monitor file changes.
|
// InotifyFileWatcher uses inotify to monitor file changes.
|
||||||
type InotifyFileWatcher struct {
|
type InotifyFileWatcher struct {
|
||||||
Filename string
|
Filename string
|
||||||
|
Size int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
|
func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
|
||||||
fw := &InotifyFileWatcher{filename}
|
fw := &InotifyFileWatcher{filename, 0}
|
||||||
return fw
|
return fw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,11 +38,23 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE)
|
|
||||||
|
dirname := filepath.Dir(fw.Filename)
|
||||||
|
|
||||||
|
// Watch for new files to be created in the parent directory.
|
||||||
|
err = w.WatchFlags(dirname, fsnotify.FSN_CREATE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer w.RemoveWatch(filepath.Dir(fw.Filename))
|
defer w.RemoveWatch(filepath.Dir(fw.Filename))
|
||||||
|
|
||||||
|
// Do a real check now as the file might have been created before
|
||||||
|
// calling `WatchFlags` above.
|
||||||
|
if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) {
|
||||||
|
// file exists, or stat returned an error.
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
evt := <-w.Event
|
evt := <-w.Event
|
||||||
if evt.Name == fw.Filename {
|
if evt.Name == fw.Filename {
|
||||||
|
@ -52,7 +65,7 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChangeEvents returns a channel that gets updated when the file is ready to be read.
|
// ChangeEvents returns a channel that gets updated when the file is ready to be read.
|
||||||
func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool {
|
func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool {
|
||||||
w, err := fsnotify.NewWatcher()
|
w, err := fsnotify.NewWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -64,20 +77,36 @@ func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool {
|
||||||
|
|
||||||
ch := make(chan bool)
|
ch := make(chan bool)
|
||||||
|
|
||||||
|
fw.Size = fi.Size()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer w.Close()
|
||||||
|
defer w.RemoveWatch(fw.Filename)
|
||||||
|
defer close(ch)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
prevSize := fw.Size
|
||||||
|
|
||||||
evt := <-w.Event
|
evt := <-w.Event
|
||||||
switch {
|
switch {
|
||||||
case evt.IsDelete():
|
case evt.IsDelete():
|
||||||
fallthrough
|
fallthrough
|
||||||
|
|
||||||
case evt.IsRename():
|
case evt.IsRename():
|
||||||
close(ch)
|
|
||||||
w.RemoveWatch(fw.Filename)
|
|
||||||
w.Close()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
case evt.IsModify():
|
case evt.IsModify():
|
||||||
|
fi, err := os.Stat(fw.Filename)
|
||||||
|
if err != nil {
|
||||||
|
// XXX: no panic here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fw.Size = fi.Size()
|
||||||
|
|
||||||
|
if prevSize > 0 && prevSize > fw.Size {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// send only if channel is empty.
|
// send only if channel is empty.
|
||||||
select {
|
select {
|
||||||
case ch <- true:
|
case ch <- true:
|
||||||
|
@ -93,22 +122,21 @@ func (fw *InotifyFileWatcher) ChangeEvents(_ os.FileInfo) chan bool {
|
||||||
// PollingFileWatcher polls the file for changes.
|
// PollingFileWatcher polls the file for changes.
|
||||||
type PollingFileWatcher struct {
|
type PollingFileWatcher struct {
|
||||||
Filename string
|
Filename string
|
||||||
|
Size int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPollingFileWatcher(filename string) *PollingFileWatcher {
|
func NewPollingFileWatcher(filename string) *PollingFileWatcher {
|
||||||
fw := &PollingFileWatcher{filename}
|
fw := &PollingFileWatcher{filename, 0}
|
||||||
return fw
|
return fw
|
||||||
}
|
}
|
||||||
|
|
||||||
var POLL_DURATION time.Duration
|
var POLL_DURATION time.Duration
|
||||||
|
|
||||||
// BlockUntilExists blocks until the file comes into existence. If the
|
|
||||||
// file already exists, then block until it is created again.
|
|
||||||
func (fw *PollingFileWatcher) BlockUntilExists() error {
|
func (fw *PollingFileWatcher) BlockUntilExists() error {
|
||||||
for {
|
for {
|
||||||
if _, err := os.Stat(fw.Filename); err == nil {
|
if _, err := os.Stat(fw.Filename); err == nil {
|
||||||
return nil
|
return nil
|
||||||
}else if !os.IsNotExist(err) {
|
} else if !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
time.Sleep(POLL_DURATION)
|
time.Sleep(POLL_DURATION)
|
||||||
|
@ -131,8 +159,11 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool {
|
||||||
stop <- true
|
stop <- true
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fw.Size = origFi.Size()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
prevSize := fw.Size
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
|
@ -157,6 +188,13 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Was the file truncated?
|
||||||
|
fw.Size = fi.Size()
|
||||||
|
if prevSize > 0 && prevSize > fw.Size {
|
||||||
|
once.Do(stopAndClose)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// If the file was changed since last check, notify.
|
// If the file was changed since last check, notify.
|
||||||
modTime := fi.ModTime()
|
modTime := fi.ModTime()
|
||||||
if modTime != prevModTime {
|
if modTime != prevModTime {
|
||||||
|
|
Loading…
Reference in New Issue