refactor code for upcoming changes to tail.go

This commit is contained in:
Sridhar Ratnakumar 2013-05-29 13:57:02 -07:00
parent 2cddd48e0a
commit d3c80d385d
3 changed files with 85 additions and 68 deletions

149
tail.go
View File

@ -13,6 +13,10 @@ import (
"time" "time"
) )
var (
ErrStop = fmt.Errorf("tail should now stop")
)
type Line struct { type Line struct {
Text string Text string
Time time.Time Time time.Time
@ -36,6 +40,7 @@ type Tail struct {
file *os.File file *os.File
reader *bufio.Reader reader *bufio.Reader
watcher watch.FileWatcher watcher watch.FileWatcher
changes chan bool
tomb.Tomb // provides: Done, Kill, Dying tomb.Tomb // provides: Done, Kill, Dying
} }
@ -81,6 +86,7 @@ func TailFile(filename string, config Config) (*Tail, error) {
return t, nil return t, nil
} }
// Stop stops the tailing activity.
func (tail *Tail) Stop() error { func (tail *Tail) Stop() error {
tail.Kill(nil) tail.Kill(nil)
return tail.Wait() return tail.Wait()
@ -122,24 +128,21 @@ func (tail *Tail) readLine() ([]byte, error) {
func (tail *Tail) tailFileSync() { func (tail *Tail) tailFileSync() {
defer tail.Done() defer tail.Done()
defer tail.close()
if !tail.MustExist { if !tail.MustExist {
// deferred first open.
err := tail.reopen() err := tail.reopen()
if err != nil { if err != nil {
tail.close()
tail.Kill(err) tail.Kill(err)
return return
} }
} }
var changes chan bool // Seek to requested location on first open of the file.
// Note: seeking to end happens only at the beginning of tail;
// never during subsequent re-opens.
if tail.Location == 0 { if tail.Location == 0 {
_, err := tail.file.Seek(0, 2) // seek to end of the file _, err := tail.file.Seek(0, 2) // Seek to the file end
if err != nil { if err != nil {
tail.close()
tail.Killf("Seek error on %s: %s", tail.Filename, err) tail.Killf("Seek error on %s: %s", tail.Filename, err)
return return
} }
@ -147,82 +150,96 @@ func (tail *Tail) tailFileSync() {
tail.reader = bufio.NewReader(tail.file) tail.reader = bufio.NewReader(tail.file)
// Read line by line.
for { for {
line, err := tail.readLine() line, err := tail.readLine()
if err == nil { switch(err) {
case nil:
if line != nil { if line != nil {
now := time.Now() tail.sendLine(line)
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
for _, line := range partitionString(string(line), tail.MaxLineSize) {
tail.Lines <- &Line{line, now}
}
} else {
tail.Lines <- &Line{string(line), now}
}
} }
} else { case io.EOF:
if err != io.EOF { // When EOF is reached, wait for more data to become
tail.close() // available. Wait strategy is based on the `tail.watcher`
tail.Killf("Error reading %s: %s", tail.Filename, err) // implementation (inotify or polling).
err = tail.waitForChanges()
if err != nil {
if err != ErrStop {
tail.Kill(err)
}
return return
} }
default: // non-EOF error
// When end of file is reached, wait for more data to tail.Killf("Error reading %s: %s", tail.Filename, err)
// become available. Wait strategy is based on the return
// `tail.watcher` implementation (inotify or polling).
if err == io.EOF {
if changes == nil {
st, err := tail.file.Stat()
if err != nil {
tail.close()
tail.Kill(err)
return
}
changes = tail.watcher.ChangeEvents(tail.Tomb, st)
}
select {
case _, ok := <-changes:
if !ok {
changes = nil
// File got deleted/renamed/truncated.
if tail.ReOpen {
// XXX: no logging in a library?
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename)
err := tail.reopen()
if err != nil {
tail.close()
tail.Kill(err)
return
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
continue
} else {
log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
tail.close()
return
}
}
case <-tail.Dying():
tail.close()
return
}
}
} }
select { select {
case <-tail.Dying(): case <-tail.Dying():
tail.close()
return return
default: default:
} }
} }
} }
// waitForChanges waits until the file has been appended, deleted,
// moved or truncated. When moved, deleted or truncated - the file
// will be re-opened if ReOpen is true.
func (tail *Tail) waitForChanges() error {
if tail.changes == nil {
st, err := tail.file.Stat()
if err != nil {
return err
}
tail.changes = tail.watcher.ChangeEvents(tail.Tomb, st)
}
select {
case _, ok := <-tail.changes:
if !ok {
tail.changes = nil
// File got deleted/renamed/truncated.
if tail.ReOpen {
// XXX: no logging in a library?
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename)
err := tail.reopen()
if err != nil {
return err
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
return nil
} else {
log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
}
}
case <-tail.Dying():
return ErrStop
}
return nil
}
// sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary.
func (tail *Tail) sendLine(line []byte) {
now := time.Now()
lines := []string{string(line)}
// Split longer lins
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = partitionString(
string(line), tail.MaxLineSize)
}
for _, line := range lines {
tail.Lines <- &Line{line, now}
}
}
// partitionString partitions the string into chunks of given size, // partitionString partitions the string into chunks of given size,
// with the last chunk of variable size. // with the last chunk of variable size.
func partitionString(s string, chunkSize int) []string { func partitionString(s string, chunkSize int) []string {

View File

@ -56,7 +56,6 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error {
panic("unreachable") panic("unreachable")
} }
// ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool { func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool {
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {

View File

@ -14,7 +14,8 @@ type FileWatcher interface {
BlockUntilExists(tomb.Tomb) error BlockUntilExists(tomb.Tomb) error
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. // times the file is ready to be read. The channel will be closed
// if the file gets deleted, renamed or truncated.
ChangeEvents(tomb.Tomb, os.FileInfo) chan bool ChangeEvents(tomb.Tomb, os.FileInfo) chan bool
} }