introduce tail command; fix tomb blocking after Kill

This commit is contained in:
Sridhar Ratnakumar 2012-10-12 17:14:35 -07:00
parent 4bbf3d28cc
commit 048bbf8933
3 changed files with 58 additions and 22 deletions

View File

@ -3,16 +3,34 @@ package main
import ( import (
"fmt" "fmt"
"logyard/tail" "logyard/tail"
"flag"
) )
var samplefile = "/Users/sridharr/Library/Logs/PyPM/1.3/PyPM.log" var samplefile = "/tmp/test"
func args2config() tail.Config {
config := tail.Config{Follow: true}
flag.IntVar(&config.Location, "n", 0, "tail from the last Nth location")
flag.BoolVar(&config.Follow, "f", false, "wait for additional data to be appended to the file")
flag.BoolVar(&config.ReOpen, "F", false, "follow, and track file rename/rotation")
flag.Parse()
if config.ReOpen {
config.Follow = true
}
return config
}
func main() { func main() {
t, err := tail.TailFile(samplefile, 1000, true, true) t, err := tail.TailFile(samplefile, args2config())
if err != nil { if err != nil {
panic(err) fmt.Println(err)
return
} }
for line := range t.Lines { for line := range t.Lines {
fmt.Println(line.Text) fmt.Println(line.Text)
} }
err = t.Wait()
if err != nil {
fmt.Println(err)
}
} }

52
tail.go
View File

@ -1,6 +1,7 @@
package tail package tail
import ( import (
"fmt"
"bufio" "bufio"
"io" "io"
"launchpad.net/tomb" "launchpad.net/tomb"
@ -41,11 +42,15 @@ type Tail struct {
// func TailFile(filename string, maxlinesize int, end bool, retry bool, useinotify bool) (*Tail, error) { // func TailFile(filename string, maxlinesize int, end bool, retry bool, useinotify bool) (*Tail, error) {
func TailFile(filename string, config Config) (*Tail, error) { func TailFile(filename string, config Config) (*Tail, error) {
if !(config.Location == 0 || config.Location == -1) { if !(config.Location == 0 || config.Location == -1) {
panic("only 0/-1 values are supported for Location") panic("only 0/-1 values are supported for Location.")
} }
if config.ReOpen && !config.Follow { if config.ReOpen && !config.Follow {
panic("cannot set ReOpen without Follow") panic("cannot set ReOpen without Follow.")
}
if !config.Follow {
panic("Follow=false is not supported.")
} }
t := &Tail{ t := &Tail{
@ -85,7 +90,7 @@ func (tail *Tail) close() {
} }
} }
func (tail *Tail) reopen() { func (tail *Tail) reopen() error {
if tail.file != nil { if tail.file != nil {
tail.file.Close() tail.file.Close()
} }
@ -94,16 +99,19 @@ func (tail *Tail) reopen() {
tail.file, err = os.Open(tail.Filename) tail.file, err = os.Open(tail.Filename)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
log.Printf("Waiting for the file to appear...")
err := tail.watcher.BlockUntilExists() err := tail.watcher.BlockUntilExists()
log.Println(err)
if err != nil { if err != nil {
tail.Killf("failed to detect creation of %s: %s", tail.Filename, err) return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
} }
continue continue
} }
tail.Killf("Unable to open file %s: %s", tail.Filename, err) return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
} }
break break
} }
return nil
} }
func (tail *Tail) readLine() ([]byte, error) { func (tail *Tail) readLine() ([]byte, error) {
@ -126,17 +134,24 @@ func (tail *Tail) tailFileSync() {
defer tail.Done() defer tail.Done()
if !tail.MustExist { if !tail.MustExist {
tail.reopen() err := tail.reopen()
if err != nil {
tail.close()
tail.Kill(err)
return
}
} }
var changes chan bool var changes chan bool
// Note: seeking to end happens only at the beginning; never // Note: seeking to end happens only at the beginning of tail;
// during subsequent re-opens. // never during subsequent re-opens.
if tail.Location == 0 { if tail.Location == 0 {
_, err := tail.file.Seek(0, 2) // seek to end of the file _, err := tail.file.Seek(0, 2) // seek to end of the file
if err != nil { if err != nil {
tail.close()
tail.Killf("Seek error on %s: %s", tail.Filename, err) tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
} }
} }
@ -167,27 +182,30 @@ func (tail *Tail) tailFileSync() {
select { select {
case _, ok := <-changes: case _, ok := <-changes:
if !ok { if !ok {
// file got deleted/renamed // File got deleted/renamed
if tail.ReOpen { if tail.ReOpen {
// TODO: no logging in a library // TODO: no logging in a library?
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename) log.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
tail.reopen() err := tail.reopen()
log.Printf("File %s has been reopened.", tail.Filename) if err != nil {
tail.close()
tail.Kill(err)
return
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize) tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize)
changes = nil changes = nil
continue continue
} else { } else {
log.Printf("File %s has gone away; skipping this file.\n", tail.Filename) log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)
tail.close() tail.close()
return return
} }
} }
case <-tail.Dying(): case <-tail.Dying():
// FIXME: respect DRY (see below)
tail.close() tail.close()
return return
} }
} }
} }
@ -200,7 +218,7 @@ func (tail *Tail) tailFileSync() {
} }
} }
// get current time in unix timestamp // getCurrentTime returns the current time as UNIX timestamp
func getCurrentTime() int64 { func getCurrentTime() int64 {
return time.Now().UTC().Unix() return time.Now().UTC().Unix()
} }

View File

@ -30,13 +30,13 @@ func (fw *InotifyFileWatcher) BlockUntilExists() error {
if err != nil { if err != nil {
return err return err
} }
defer w.Close()
err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE) err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE)
if err != nil { if err != nil {
return err return err
} }
defer w.RemoveWatch(filepath.Dir(fw.Filename))
<-w.Event <-w.Event
w.RemoveWatch(filepath.Dir(fw.Filename))
w.Close()
return nil return nil
} }