clean up ChangeEvents' goroutines upon tail.Stop

This commit is contained in:
Sridhar Ratnakumar 2013-05-29 11:35:27 -07:00
parent 7599e3efb9
commit 2cddd48e0a
5 changed files with 22 additions and 13 deletions

View File

@ -1,9 +1,10 @@
# May, 2013 # May, 2013
* Recognize deletions/renames when using polling file watcher (PR #1) * Detect file deletions/renames in polling file watcher (PR #1)
* Detect file truncation * Detect file truncation
* Fix potential race condition when reopening the file (issue 5) * Fix potential race condition when reopening the file (issue 5)
* Fix potential blocking of `tail.Stop` (issue 4) * Fix potential blocking of `tail.Stop` (issue 4)
* Fix uncleaned up ChangeEvents goroutines after calling tail.Stop
# Feb, 2013 # Feb, 2013

View File

@ -179,17 +179,17 @@ func (tail *Tail) tailFileSync() {
tail.Kill(err) tail.Kill(err)
return return
} }
changes = tail.watcher.ChangeEvents(st) changes = tail.watcher.ChangeEvents(tail.Tomb, st)
} }
select { select {
case _, ok := <-changes: case _, ok := <-changes:
if !ok { if !ok {
changes = nil // XXX: use tomb to kill changes' goroutine. changes = nil
// File got deleted/renamed/truncated. // File got deleted/renamed/truncated.
if tail.ReOpen { if tail.ReOpen {
// TODO: no logging in a library? // XXX: no logging in a library?
log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename) log.Printf("Re-opening moved/deleted/truncated file %s ...", tail.Filename)
err := tail.reopen() err := tail.reopen()
if err != nil { if err != nil {
@ -202,7 +202,7 @@ func (tail *Tail) tailFileSync() {
continue continue
} else { } else {
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename) log.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
tail.close() tail.close()
return return
} }

View File

@ -3,7 +3,6 @@
package watch package watch
import ( import (
"fmt"
"github.com/howeyc/fsnotify" "github.com/howeyc/fsnotify"
"os" "os"
"path/filepath" "path/filepath"
@ -51,14 +50,14 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t tomb.Tomb) error {
return nil return nil
} }
case <-t.Dying(): case <-t.Dying():
return fmt.Errorf("Tomb dying") return tomb.ErrDying
} }
} }
panic("unreachable") panic("unreachable")
} }
// ChangeEvents returns a channel that gets updated when the file is ready to be read. // ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { func (fw *InotifyFileWatcher) ChangeEvents(t tomb.Tomb, fi os.FileInfo) chan bool {
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
panic(err) panic(err)
@ -80,7 +79,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool {
for { for {
prevSize := fw.Size prevSize := fw.Size
evt := <-w.Event var evt *fsnotify.FileEvent
select {
case evt = <-w.Event:
case <-t.Dying():
return
}
switch { switch {
case evt.IsDelete(): case evt.IsDelete():
fallthrough fallthrough

View File

@ -7,7 +7,6 @@ import (
"os" "os"
"sync" "sync"
"time" "time"
"fmt"
) )
// PollingFileWatcher polls the file for changes. // PollingFileWatcher polls the file for changes.
@ -34,13 +33,13 @@ func (fw *PollingFileWatcher) BlockUntilExists(t tomb.Tomb) error {
case <-time.After(POLL_DURATION): case <-time.After(POLL_DURATION):
continue continue
case <-t.Dying(): case <-t.Dying():
return fmt.Errorf("Tomb dying") return tomb.ErrDying
} }
} }
panic("unreachable") panic("unreachable")
} }
func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { func (fw *PollingFileWatcher) ChangeEvents(t tomb.Tomb, origFi os.FileInfo) chan bool {
ch := make(chan bool) ch := make(chan bool)
stop := make(chan bool) stop := make(chan bool)
var once sync.Once var once sync.Once
@ -64,6 +63,9 @@ func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool {
select { select {
case <-stop: case <-stop:
return return
case <-t.Dying():
once.Do(stopAndClose)
continue
default: default:
} }

View File

@ -15,6 +15,6 @@ type FileWatcher interface {
// ChangeEvents returns a channel of events corresponding to the // ChangeEvents returns a channel of events corresponding to the
// times the file is ready to be read. // times the file is ready to be read.
ChangeEvents(os.FileInfo) chan bool ChangeEvents(tomb.Tomb, os.FileInfo) chan bool
} }