gracefully manage goroutines death using tomb
http://blog.labix.org/2011/10/09/death-of-goroutines-under-control
This commit is contained in:
parent
187dea7196
commit
4bbf3d28cc
46
tail.go
46
tail.go
|
@ -2,8 +2,8 @@ package tail
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
|
"launchpad.net/tomb"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
@ -32,8 +32,7 @@ type Tail struct {
|
||||||
reader *bufio.Reader
|
reader *bufio.Reader
|
||||||
watcher FileWatcher
|
watcher FileWatcher
|
||||||
|
|
||||||
stop chan bool
|
tomb.Tomb // provides: Done, Kill, Dying
|
||||||
created chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TailFile channels the lines of a logfile along with timestamp. If
|
// TailFile channels the lines of a logfile along with timestamp. If
|
||||||
|
@ -45,15 +44,14 @@ func TailFile(filename string, config Config) (*Tail, error) {
|
||||||
panic("only 0/-1 values are supported for Location")
|
panic("only 0/-1 values are supported for Location")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.ReOpen && !config.Follow {
|
||||||
|
panic("cannot set ReOpen without Follow")
|
||||||
|
}
|
||||||
|
|
||||||
t := &Tail{
|
t := &Tail{
|
||||||
filename,
|
Filename: filename,
|
||||||
make(chan *Line),
|
Lines: make(chan *Line),
|
||||||
config,
|
Config: config}
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
make(chan bool),
|
|
||||||
make(chan bool)}
|
|
||||||
|
|
||||||
if t.Poll {
|
if t.Poll {
|
||||||
log.Println("Warning: not using inotify; will poll ", filename)
|
log.Println("Warning: not using inotify; will poll ", filename)
|
||||||
|
@ -75,9 +73,9 @@ func TailFile(filename string, config Config) (*Tail, error) {
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tail *Tail) Stop() {
|
func (tail *Tail) Stop() error {
|
||||||
tail.stop <- true
|
tail.Kill(nil)
|
||||||
tail.close()
|
return tail.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tail *Tail) close() {
|
func (tail *Tail) close() {
|
||||||
|
@ -98,11 +96,11 @@ func (tail *Tail) reopen() {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
err := tail.watcher.BlockUntilExists()
|
err := tail.watcher.BlockUntilExists()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: use error channels
|
tail.Killf("failed to detect creation of %s: %s", tail.Filename, err)
|
||||||
log.Fatalf("cannot watch for file creation -- %s", tail.Filename, err)
|
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
tail.Killf("Unable to open file %s: %s", tail.Filename, err)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -125,6 +123,8 @@ func (tail *Tail) readLine() ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tail *Tail) tailFileSync() {
|
func (tail *Tail) tailFileSync() {
|
||||||
|
defer tail.Done()
|
||||||
|
|
||||||
if !tail.MustExist {
|
if !tail.MustExist {
|
||||||
tail.reopen()
|
tail.reopen()
|
||||||
}
|
}
|
||||||
|
@ -136,8 +136,7 @@ func (tail *Tail) tailFileSync() {
|
||||||
if tail.Location == 0 {
|
if tail.Location == 0 {
|
||||||
_, err := tail.file.Seek(0, 2) // seek to end of the file
|
_, err := tail.file.Seek(0, 2) // seek to end of the file
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: don't panic here
|
tail.Killf("Seek error on %s: %s", tail.Filename, err)
|
||||||
panic(fmt.Sprintf("seek error: %s", err))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,8 +151,8 @@ func (tail *Tail) tailFileSync() {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
log.Println("Error reading file; skipping this file - ", err)
|
|
||||||
tail.close()
|
tail.close()
|
||||||
|
tail.Killf("Error reading %s: %s", tail.Filename, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,6 +169,7 @@ func (tail *Tail) tailFileSync() {
|
||||||
if !ok {
|
if !ok {
|
||||||
// file got deleted/renamed
|
// file got deleted/renamed
|
||||||
if tail.ReOpen {
|
if tail.ReOpen {
|
||||||
|
// TODO: no logging in a library
|
||||||
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename)
|
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename)
|
||||||
tail.reopen()
|
tail.reopen()
|
||||||
log.Printf("File %s has been reopened.", tail.Filename)
|
log.Printf("File %s has been reopened.", tail.Filename)
|
||||||
|
@ -182,18 +182,18 @@ func (tail *Tail) tailFileSync() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-tail.stop:
|
case <-tail.Dying():
|
||||||
// stop the tailer if requested.
|
|
||||||
// FIXME: respect DRY (see below)
|
// FIXME: respect DRY (see below)
|
||||||
|
tail.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop the tailer if requested.
|
|
||||||
select {
|
select {
|
||||||
case <-tail.stop:
|
case <-tail.Dying():
|
||||||
|
tail.close()
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,8 +10,12 @@ func TestMissingFile(t *testing.T) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("MustExist:true is violated")
|
t.Error("MustExist:true is violated")
|
||||||
}
|
}
|
||||||
_, err = TailFile("README.md", Config{Follow: true, MustExist: false})
|
_, err = TailFile("/no/such/file", Config{Follow: true, MustExist: false})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("MustExist:false is violated")
|
t.Error("MustExist:false is violated")
|
||||||
}
|
}
|
||||||
|
_, err = TailFile("README.md", Config{Follow: true, MustExist: true})
|
||||||
|
if err != nil {
|
||||||
|
t.Error("MustExist:true on an existing file is violated")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue