fix tail.Stop blocking when called in midst of re-opening

this is what caused the test to hang occasionally. after this fix,
test completes in 2mins all the time.

closes issue #4
This commit is contained in:
Sridhar Ratnakumar 2013-05-28 19:33:52 -07:00
parent 0f67bc352f
commit 7599e3efb9
6 changed files with 42 additions and 44 deletions

View File

@ -3,6 +3,7 @@
* Recognize deletions/renames when using polling file watcher (PR #1) * Recognize deletions/renames when using polling file watcher (PR #1)
* Detect file truncation * Detect file truncation
* Fix potential race condition when reopening the file (issue 5) * Fix potential race condition when reopening the file (issue 5)
* Fix potential blocking of `tail.Stop` (issue 4)
# Feb, 2013 # Feb, 2013

View File

@ -5,12 +5,12 @@ package tail
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/ActiveState/tail/watch"
"io" "io"
"launchpad.net/tomb" "launchpad.net/tomb"
"log" "log"
"os" "os"
"time" "time"
"github.com/ActiveState/tail/watch"
) )
type Line struct { type Line struct {
@ -103,7 +103,7 @@ func (tail *Tail) reopen() error {
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
log.Printf("Waiting for %s to appear...", tail.Filename) log.Printf("Waiting for %s to appear...", tail.Filename)
if err := tail.watcher.BlockUntilExists(); err != nil { if err := tail.watcher.BlockUntilExists(tail.Tomb); err != nil {
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
} }
continue continue
@ -185,7 +185,7 @@ func (tail *Tail) tailFileSync() {
select { select {
case _, ok := <-changes: case _, ok := <-changes:
if !ok { if !ok {
changes = nil // XXX: how to kill changes' goroutine? changes = nil // XXX: use tomb to kill changes' goroutine.
// File got deleted/renamed/truncated. // File got deleted/renamed/truncated.
if tail.ReOpen { if tail.ReOpen {

View File

@ -6,12 +6,12 @@
package tail package tail
import ( import (
"./watch"
_ "fmt" _ "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time" "time"
"./watch"
) )
func init() { func init() {
@ -102,30 +102,23 @@ func _TestReOpen(_t *testing.T, poll bool) {
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.CreateFile("test.txt", "more\ndata\n") t.CreateFile("test.txt", "more\ndata\n")
if poll {
// Give polling a chance to read the just-written lines (more;
// data), before we recreate the file again below.
<-time.After(watch.POLL_DURATION)
}
// rename must trigger reopen // rename must trigger reopen
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RenameFile("test.txt", "test.txt.rotated") t.RenameFile("test.txt", "test.txt.rotated")
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
if poll {
// This time, wait a bit before creating the file to test
// PollingFileWatcher's BlockUntilExists.
<-time.After(watch.POLL_DURATION)
}
t.CreateFile("test.txt", "endofworld") t.CreateFile("test.txt", "endofworld")
// Delete after a reasonable delay, to give tail sufficient time // Delete after a reasonable delay, to give tail sufficient time
// to read all lines. // to read all lines.
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
<-time.After(100 * time.Millisecond)
println("Stopping (REOPEN)...") // Do not bother with stopping as it could kill the tomb during
tail.Stop() // the reading of data written above. Timings can vary based on
// test environment.
// tail.Stop()
} }
// The use of polling file watcher could affect file rotation // The use of polling file watcher could affect file rotation
@ -157,27 +150,17 @@ func _TestReSeek(_t *testing.T, poll bool) {
// truncate now // truncate now
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
if poll {
// Give polling a chance to read the just-written lines (more;
// data), before we truncate the file again below.
<-time.After(watch.POLL_DURATION)
}
println("truncating..")
t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n") t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n")
// XXX: is this required for this test function?
if poll {
// Give polling a chance to read the just-written lines (more;
// data), before we recreate the file again below.
<-time.After(watch.POLL_DURATION)
}
// Delete after a reasonable delay, to give tail sufficient time // Delete after a reasonable delay, to give tail sufficient time
// to read all lines. // to read all lines.
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
println("Stopping (RESEEK)...") // Do not bother with stopping as it could kill the tomb during
tail.Stop() // the reading of data written above. Timings can vary based on
// test environment.
// tail.Stop()
} }
// The use of polling file watcher could affect file rotation // The use of polling file watcher could affect file rotation
@ -206,8 +189,9 @@ func NewTailTest(name string, t *testing.T) TailTest {
tt.Fatal(err) tt.Fatal(err)
} }
// Use a smaller poll duration for faster test runs. // Use a smaller poll duration for faster test runs. Keep it below
watch.POLL_DURATION = 25 * time.Millisecond // 100ms (which value is used as common delays for tests)
watch.POLL_DURATION = 5 * time.Millisecond
return tt return tt
} }
@ -271,12 +255,11 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
for idx, line := range lines { for idx, line := range lines {
tailedLine, ok := <-tail.Lines tailedLine, ok := <-tail.Lines
if !ok { if !ok {
err := tail.Wait() err := tail.Err()
if err != nil { if err != nil {
t.Fatal("tail ended early with error: %v", err) t.Errorf("tail ended with error: %v", err)
} else {
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
} }
t.Fatalf("tail ended early; expecting more: %v", lines[idx:])
} }
if tailedLine == nil { if tailedLine == nil {
t.Fatalf("tail.Lines returned nil; not possible") t.Fatalf("tail.Lines returned nil; not possible")

View File

@ -3,9 +3,11 @@
package watch package watch
import ( import (
"fmt"
"github.com/howeyc/fsnotify" "github.com/howeyc/fsnotify"
"os" "os"
"path/filepath" "path/filepath"
"launchpad.net/tomb"
) )
// InotifyFileWatcher uses inotify to monitor file changes. // InotifyFileWatcher uses inotify to monitor file changes.
@ -19,7 +21,7 @@ func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
return fw return fw
} }
func (fw *InotifyFileWatcher) BlockUntilExists() error { func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error {
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return err return err
@ -43,13 +45,17 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
} }
for { for {
evt := <-w.Event select {
case evt := <-w.Event:
if evt.Name == fw.Filename { if evt.Name == fw.Filename {
break
}
}
return nil return nil
} }
case <-t.Dying():
return fmt.Errorf("Tomb dying")
}
}
panic("unreachable")
}
// ChangeEvents returns a channel that gets updated when the file is ready to be read. // ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool {

View File

@ -3,9 +3,11 @@
package watch package watch
import ( import (
"launchpad.net/tomb"
"os" "os"
"sync" "sync"
"time" "time"
"fmt"
) )
// PollingFileWatcher polls the file for changes. // PollingFileWatcher polls the file for changes.
@ -21,14 +23,19 @@ func NewPollingFileWatcher(filename string) *PollingFileWatcher {
var POLL_DURATION time.Duration var POLL_DURATION time.Duration
func (fw *PollingFileWatcher) BlockUntilExists() error { func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error {
for { for {
if _, err := os.Stat(fw.Filename); err == nil { if _, err := os.Stat(fw.Filename); err == nil {
return nil return nil
} else if !os.IsNotExist(err) { } else if !os.IsNotExist(err) {
return err return err
} }
time.Sleep(POLL_DURATION) select {
case <-time.After(POLL_DURATION):
continue
case <-t.Dying():
return fmt.Errorf("Tomb dying")
}
} }
panic("unreachable") panic("unreachable")
} }

View File

@ -4,13 +4,14 @@ package watch
import ( import (
"os" "os"
"launchpad.net/tomb"
) )
// FileWatcher monitors file-level events. // FileWatcher monitors file-level events.
type FileWatcher interface { type FileWatcher interface {
// BlockUntilExists blocks until the missing file comes into // BlockUntilExists blocks until the missing file comes into
// existence. If the file already exists, returns immediately. // existence. If the file already exists, returns immediately.
BlockUntilExists() error BlockUntilExists(tomb.Tomb) error
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. // times the file is ready to be read.