From 0f67bc352f94a396249167641f00a2043d30bcb9 Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Tue, 28 May 2013 16:34:36 -0700 Subject: [PATCH] make watch.go its own package for the purpose for upcoming changes to the watch functionality. --- tail.go | 7 +- tail_test.go | 11 +-- watch.go | 215 ----------------------------------------------- watch/inotify.go | 107 +++++++++++++++++++++++ watch/polling.go | 104 +++++++++++++++++++++++ watch/watch.go | 19 +++++ 6 files changed, 240 insertions(+), 223 deletions(-) delete mode 100644 watch.go create mode 100644 watch/inotify.go create mode 100644 watch/polling.go create mode 100644 watch/watch.go diff --git a/tail.go b/tail.go index c8647aa..bfbf702 100644 --- a/tail.go +++ b/tail.go @@ -10,6 +10,7 @@ import ( "log" "os" "time" + "github.com/ActiveState/tail/watch" ) type Line struct { @@ -34,7 +35,7 @@ type Tail struct { file *os.File reader *bufio.Reader - watcher FileWatcher + watcher watch.FileWatcher tomb.Tomb // provides: Done, Kill, Dying } @@ -62,9 +63,9 @@ func TailFile(filename string, config Config) (*Tail, error) { Config: config} if t.Poll { - t.watcher = NewPollingFileWatcher(filename) + t.watcher = watch.NewPollingFileWatcher(filename) } else { - t.watcher = NewInotifyFileWatcher(filename) + t.watcher = watch.NewInotifyFileWatcher(filename) } if t.MustExist { diff --git a/tail_test.go b/tail_test.go index cc1af62..f69f519 100644 --- a/tail_test.go +++ b/tail_test.go @@ -11,6 +11,7 @@ import ( "os" "testing" "time" + "./watch" ) func init() { @@ -104,7 +105,7 @@ func _TestReOpen(_t *testing.T, poll bool) { if poll { // Give polling a chance to read the just-written lines (more; // data), before we recreate the file again below. - <-time.After(POLL_DURATION) + <-time.After(watch.POLL_DURATION) } // rename must trigger reopen @@ -114,7 +115,7 @@ func _TestReOpen(_t *testing.T, poll bool) { if poll { // This time, wait a bit before creating the file to test // PollingFileWatcher's BlockUntilExists. - <-time.After(POLL_DURATION) + <-time.After(watch.POLL_DURATION) } t.CreateFile("test.txt", "endofworld") @@ -159,7 +160,7 @@ func _TestReSeek(_t *testing.T, poll bool) { if poll { // Give polling a chance to read the just-written lines (more; // data), before we truncate the file again below. - <-time.After(POLL_DURATION) + <-time.After(watch.POLL_DURATION) } println("truncating..") t.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n") @@ -167,7 +168,7 @@ func _TestReSeek(_t *testing.T, poll bool) { if poll { // Give polling a chance to read the just-written lines (more; // data), before we recreate the file again below. - <-time.After(POLL_DURATION) + <-time.After(watch.POLL_DURATION) } // Delete after a reasonable delay, to give tail sufficient time @@ -206,7 +207,7 @@ func NewTailTest(name string, t *testing.T) TailTest { } // Use a smaller poll duration for faster test runs. - POLL_DURATION = 25 * time.Millisecond + watch.POLL_DURATION = 25 * time.Millisecond return tt } diff --git a/watch.go b/watch.go deleted file mode 100644 index 1d8c4c7..0000000 --- a/watch.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package tail - -import ( - "github.com/howeyc/fsnotify" - "os" - "path/filepath" - "sync" - "time" -) - -// FileWatcher monitors file-level events. -type FileWatcher interface { - // BlockUntilExists blocks until the missing file comes into - // existence. If the file already exists, returns immediately. - BlockUntilExists() error - - // ChangeEvents returns a channel of events corresponding to the - // times the file is ready to be read. - ChangeEvents(os.FileInfo) chan bool -} - -// InotifyFileWatcher uses inotify to monitor file changes. -type InotifyFileWatcher struct { - Filename string - Size int64 -} - -func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { - fw := &InotifyFileWatcher{filename, 0} - return fw -} - -func (fw *InotifyFileWatcher) BlockUntilExists() error { - w, err := fsnotify.NewWatcher() - if err != nil { - return err - } - defer w.Close() - - dirname := filepath.Dir(fw.Filename) - - // Watch for new files to be created in the parent directory. - err = w.WatchFlags(dirname, fsnotify.FSN_CREATE) - if err != nil { - return err - } - defer w.RemoveWatch(filepath.Dir(fw.Filename)) - - // Do a real check now as the file might have been created before - // calling `WatchFlags` above. - if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) { - // file exists, or stat returned an error. - return err - } - - for { - evt := <-w.Event - if evt.Name == fw.Filename { - break - } - } - return nil -} - -// ChangeEvents returns a channel that gets updated when the file is ready to be read. -func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { - w, err := fsnotify.NewWatcher() - if err != nil { - panic(err) - } - err = w.Watch(fw.Filename) - if err != nil { - panic(err) - } - - ch := make(chan bool) - - fw.Size = fi.Size() - - go func() { - defer w.Close() - defer w.RemoveWatch(fw.Filename) - defer close(ch) - - for { - prevSize := fw.Size - - evt := <-w.Event - switch { - case evt.IsDelete(): - fallthrough - - case evt.IsRename(): - return - - case evt.IsModify(): - fi, err := os.Stat(fw.Filename) - if err != nil { - // XXX: no panic here - panic(err) - } - fw.Size = fi.Size() - - if prevSize > 0 && prevSize > fw.Size { - return - } - - // send only if channel is empty. - select { - case ch <- true: - default: - } - } - } - }() - - return ch -} - -// PollingFileWatcher polls the file for changes. -type PollingFileWatcher struct { - Filename string - Size int64 -} - -func NewPollingFileWatcher(filename string) *PollingFileWatcher { - fw := &PollingFileWatcher{filename, 0} - return fw -} - -var POLL_DURATION time.Duration - -func (fw *PollingFileWatcher) BlockUntilExists() error { - for { - if _, err := os.Stat(fw.Filename); err == nil { - return nil - } else if !os.IsNotExist(err) { - return err - } - time.Sleep(POLL_DURATION) - } - panic("unreachable") -} - -func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { - ch := make(chan bool) - stop := make(chan bool) - var once sync.Once - var prevModTime time.Time - - // XXX: use tomb.Tomb to cleanly manage these goroutines. replace - // the panic (below) with tomb's Kill. - - stopAndClose := func() { - go func() { - close(ch) - stop <- true - }() - } - - fw.Size = origFi.Size() - - go func() { - prevSize := fw.Size - for { - select { - case <-stop: - return - default: - } - - time.Sleep(POLL_DURATION) - fi, err := os.Stat(fw.Filename) - if err != nil { - if os.IsNotExist(err) { - once.Do(stopAndClose) - continue - } - /// XXX: do not panic here. - panic(err) - } - - // File got moved/rename within POLL_DURATION? - if !os.SameFile(origFi, fi) { - once.Do(stopAndClose) - continue - } - - // Was the file truncated? - fw.Size = fi.Size() - if prevSize > 0 && prevSize > fw.Size { - once.Do(stopAndClose) - continue - } - - // If the file was changed since last check, notify. - modTime := fi.ModTime() - if modTime != prevModTime { - prevModTime = modTime - select { - case ch <- true: - default: - } - } - } - }() - - return ch -} - -func init() { - POLL_DURATION = 250 * time.Millisecond -} diff --git a/watch/inotify.go b/watch/inotify.go new file mode 100644 index 0000000..b6bdaa0 --- /dev/null +++ b/watch/inotify.go @@ -0,0 +1,107 @@ +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "github.com/howeyc/fsnotify" + "os" + "path/filepath" +) + +// InotifyFileWatcher uses inotify to monitor file changes. +type InotifyFileWatcher struct { + Filename string + Size int64 +} + +func NewInotifyFileWatcher(filename string) *InotifyFileWatcher { + fw := &InotifyFileWatcher{filename, 0} + return fw +} + +func (fw *InotifyFileWatcher) BlockUntilExists() error { + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer w.Close() + + dirname := filepath.Dir(fw.Filename) + + // Watch for new files to be created in the parent directory. + err = w.WatchFlags(dirname, fsnotify.FSN_CREATE) + if err != nil { + return err + } + defer w.RemoveWatch(filepath.Dir(fw.Filename)) + + // Do a real check now as the file might have been created before + // calling `WatchFlags` above. + if _, err = os.Stat(fw.Filename); !os.IsNotExist(err) { + // file exists, or stat returned an error. + return err + } + + for { + evt := <-w.Event + if evt.Name == fw.Filename { + break + } + } + return nil +} + +// ChangeEvents returns a channel that gets updated when the file is ready to be read. +func (fw *InotifyFileWatcher) ChangeEvents(fi os.FileInfo) chan bool { + w, err := fsnotify.NewWatcher() + if err != nil { + panic(err) + } + err = w.Watch(fw.Filename) + if err != nil { + panic(err) + } + + ch := make(chan bool) + + fw.Size = fi.Size() + + go func() { + defer w.Close() + defer w.RemoveWatch(fw.Filename) + defer close(ch) + + for { + prevSize := fw.Size + + evt := <-w.Event + switch { + case evt.IsDelete(): + fallthrough + + case evt.IsRename(): + return + + case evt.IsModify(): + fi, err := os.Stat(fw.Filename) + if err != nil { + // XXX: no panic here + panic(err) + } + fw.Size = fi.Size() + + if prevSize > 0 && prevSize > fw.Size { + return + } + + // send only if channel is empty. + select { + case ch <- true: + default: + } + } + } + }() + + return ch +} diff --git a/watch/polling.go b/watch/polling.go new file mode 100644 index 0000000..3f2f3e1 --- /dev/null +++ b/watch/polling.go @@ -0,0 +1,104 @@ +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "os" + "sync" + "time" +) + +// PollingFileWatcher polls the file for changes. +type PollingFileWatcher struct { + Filename string + Size int64 +} + +func NewPollingFileWatcher(filename string) *PollingFileWatcher { + fw := &PollingFileWatcher{filename, 0} + return fw +} + +var POLL_DURATION time.Duration + +func (fw *PollingFileWatcher) BlockUntilExists() error { + for { + if _, err := os.Stat(fw.Filename); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + time.Sleep(POLL_DURATION) + } + panic("unreachable") +} + +func (fw *PollingFileWatcher) ChangeEvents(origFi os.FileInfo) chan bool { + ch := make(chan bool) + stop := make(chan bool) + var once sync.Once + var prevModTime time.Time + + // XXX: use tomb.Tomb to cleanly manage these goroutines. replace + // the panic (below) with tomb's Kill. + + stopAndClose := func() { + go func() { + close(ch) + stop <- true + }() + } + + fw.Size = origFi.Size() + + go func() { + prevSize := fw.Size + for { + select { + case <-stop: + return + default: + } + + time.Sleep(POLL_DURATION) + fi, err := os.Stat(fw.Filename) + if err != nil { + if os.IsNotExist(err) { + once.Do(stopAndClose) + continue + } + /// XXX: do not panic here. + panic(err) + } + + // File got moved/rename within POLL_DURATION? + if !os.SameFile(origFi, fi) { + once.Do(stopAndClose) + continue + } + + // Was the file truncated? + fw.Size = fi.Size() + if prevSize > 0 && prevSize > fw.Size { + once.Do(stopAndClose) + continue + } + + // If the file was changed since last check, notify. + modTime := fi.ModTime() + if modTime != prevModTime { + prevModTime = modTime + select { + case ch <- true: + default: + } + } + } + }() + + return ch +} + +func init() { + POLL_DURATION = 250 * time.Millisecond +} diff --git a/watch/watch.go b/watch/watch.go new file mode 100644 index 0000000..afa9f6a --- /dev/null +++ b/watch/watch.go @@ -0,0 +1,19 @@ +// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. + +package watch + +import ( + "os" +) + +// FileWatcher monitors file-level events. +type FileWatcher interface { + // BlockUntilExists blocks until the missing file comes into + // existence. If the file already exists, returns immediately. + BlockUntilExists() error + + // ChangeEvents returns a channel of events corresponding to the + // times the file is ready to be read. + ChangeEvents(os.FileInfo) chan bool +} +