From d3c80d385dedc2b1548939519a56285df44c03f7 Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Wed, 29 May 2013 13:57:02 -0700 Subject: [PATCH] refactor code for upcoming changes to tail.go --- tail.go | 149 ++++++++++++++++++++++++++--------------------- watch/inotify.go | 1 - watch/watch.go | 3 +- 3 files changed, 85 insertions(+), 68 deletions(-) diff --git a/tail.go b/tail.go index 9cb2716..74040ac 100644 --- a/tail.go +++ b/tail.go @@ -13,6 +13,10 @@ import ( "time" ) +var ( + ErrStop = fmt.Errorf("tail should now stop") +) + type Line struct { Text string Time time.Time @@ -36,6 +40,7 @@ type Tail struct { file *os.File reader *bufio.Reader watcher watch.FileWatcher + changes chan bool tomb.Tomb // provides: Done, Kill, Dying } @@ -81,6 +86,7 @@ func TailFile(filename string, config Config) (*Tail, error) { return t, nil } +// Stop stops the tailing activity. func (tail *Tail) Stop() error { tail.Kill(nil) return tail.Wait() @@ -122,24 +128,21 @@ func (tail *Tail) readLine() ([]byte, error) { func (tail *Tail) tailFileSync() { defer tail.Done() + defer tail.close() if !tail.MustExist { + // deferred first open. err := tail.reopen() if err != nil { - tail.close() tail.Kill(err) return } } - var changes chan bool - - // Note: seeking to end happens only at the beginning of tail; - // never during subsequent re-opens. + // Seek to requested location on first open of the file. if tail.Location == 0 { - _, err := tail.file.Seek(0, 2) // seek to end of the file + _, err := tail.file.Seek(0, 2) // Seek to the file end if err != nil { - tail.close() tail.Killf("Seek error on %s: %s", tail.Filename, err) return } @@ -147,82 +150,96 @@ func (tail *Tail) tailFileSync() { tail.reader = bufio.NewReader(tail.file) + // Read line by line. for { line, err := tail.readLine() - if err == nil { + switch(err) { + case nil: if line != nil { - now := time.Now() - if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { - for _, line := range partitionString(string(line), tail.MaxLineSize) { - tail.Lines <- &Line{line, now} - } - } else { - tail.Lines <- &Line{string(line), now} - } + tail.sendLine(line) } - } else { - if err != io.EOF { - tail.close() - tail.Killf("Error reading %s: %s", tail.Filename, err) + case io.EOF: + // When EOF is reached, wait for more data to become + // available. Wait strategy is based on the `tail.watcher` + // implementation (inotify or polling). + err = tail.waitForChanges() + if err != nil { + if err != ErrStop { + tail.Kill(err) + } return } - - // When end of file is reached, wait for more data to - // become available. Wait strategy is based on the - // `tail.watcher` implementation (inotify or polling). - if err == io.EOF { - if changes == nil { - st, err := tail.file.Stat() - if err != nil { - tail.close() - tail.Kill(err) - return - } - changes = tail.watcher.ChangeEvents(tail.Tomb, st) - } - - select { - case _, ok := <-changes: - if !ok { - changes = nil - - // File got deleted/renamed/truncated. - if tail.ReOpen { - // XXX: no logging in a library? - log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) - err := tail.reopen() - if err != nil { - tail.close() - tail.Kill(err) - return - } - log.Printf("Successfully reopened %s", tail.Filename) - tail.reader = bufio.NewReader(tail.file) - - continue - } else { - log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) - tail.close() - return - } - } - case <-tail.Dying(): - tail.close() - return - } - } + default: // non-EOF error + tail.Killf("Error reading %s: %s", tail.Filename, err) + return } select { case <-tail.Dying(): - tail.close() return default: } } } +// waitForChanges waits until the file has been appended, deleted, +// moved or truncated. When moved, deleted or truncated - the file +// will be re-opened if ReOpen is true. +func (tail *Tail) waitForChanges() error { + if tail.changes == nil { + st, err := tail.file.Stat() + if err != nil { + return err + } + tail.changes = tail.watcher.ChangeEvents(tail.Tomb, st) + } + + select { + case _, ok := <-tail.changes: + if !ok { + tail.changes = nil + + // File got deleted/renamed/truncated. + if tail.ReOpen { + // XXX: no logging in a library? + log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) + err := tail.reopen() + if err != nil { + return err + } + log.Printf("Successfully reopened %s", tail.Filename) + tail.reader = bufio.NewReader(tail.file) + return nil + } else { + log.Printf("Stopping tail as file no longer exists: %s", tail.Filename) + return ErrStop + } + } + case <-tail.Dying(): + return ErrStop + } + return nil +} + +// sendLine sends the line(s) to Lines channel, splitting longer lines +// if necessary. +func (tail *Tail) sendLine(line []byte) { + now := time.Now() + lines := []string{string(line)} + + // Split longer lins + if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { + lines = partitionString( + string(line), tail.MaxLineSize) + } + + for _, line := range lines { + tail.Lines <- &Line{line, now} + } + +} + // partitionString partitions the string into chunks of given size, // with the last chunk of variable size. func partitionString(s string, chunkSize int) []string { diff --git a/watch/inotify.go b/watch/inotify.go index cb732c0..2d20b44 100644 --- a/watch/inotify.go +++ b/watch/inotify.go @@ -56,7 +56,6 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error { panic("unreachable") } -// ChangeEvents returns a channel that gets updated when the file is ready to be read. func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool { w, err := fsnotify.NewWatcher() if err != nil { diff --git a/watch/watch.go b/watch/watch.go index 13fca2f..0b4519d 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -14,7 +14,8 @@ type FileWatcher interface { BlockUntilExists(tomb.Tomb) error // ChangeEvents returns a channel of events corresponding to the - // times the file is ready to be read. + // times the file is ready to be read. The channel will be closed + // if the file gets deleted, renamed or truncated. ChangeEvents(tomb.Tomb, os.FileInfo) chan bool }