Bug #95803 - reduce cpu usage due to frequent polling

* use inotify for system log aggregation
* continue using polling for app log aggregation (overlayfs bug)
  * but increase wait time to 250ms

Squashed commit of the following:

commit 8ccd0359e559472be0066ad889ac1772e13ff20b
Author: Sridhar Ratnakumar <sridharr@activestate.com>
Date:   Thu Oct 11 21:28:41 2012 -0700

    complete the polling watcher

commit 0a5d5aa488e96aa247c7c88c25cd5a30219f5344
Author: Sridhar Ratnakumar <sridharr@activestate.com>
Date:   Thu Oct 11 20:50:08 2012 -0700

    hackish, untested, incomplete implementation of pollig filewatcher

    .. to be tested on macbook.

commit c7ac3851452ed23a8b099773cc9c9f23734a89f5
Author: Sridhar Ratnakumar <sridharr@activestate.com>
Date:   Thu Oct 11 15:45:59 2012 -0700

    tail: use inotify instead of polling

    reduces cpu usage and fixes  Bug #95803
This commit is contained in:
Sridhar Ratnakumar 2012-10-11 21:29:06 -07:00
parent 6a5092ff5c
commit 3ff602e781
2 changed files with 191 additions and 67 deletions

127
tail.go
View File

@ -3,11 +3,9 @@ package tail
import (
"bufio"
"fmt"
"github.com/howeyc/fsnotify"
"io"
"log"
"os"
"path/filepath"
"time"
)
@ -20,10 +18,11 @@ type Tail struct {
Filename string
Lines chan *Line
useinotify bool
maxlinesize int
file *os.File
reader *bufio.Reader
watcher *fsnotify.Watcher
watcher FileWatcher
stop chan bool
created chan bool
@ -32,21 +31,25 @@ type Tail struct {
// TailFile channels the lines of a logfile along with timestamp. If
// end is true, channel only newly added lines. If retry is true, tail
// the file name (not descriptor) and retry on file open/read errors.
func TailFile(filename string, maxlinesize int, end bool, retry bool) (*Tail, error) {
watcher, err := fileCreateWatcher(filename)
if err != nil {
return nil, err
}
func TailFile(filename string, maxlinesize int, end bool, retry bool, useinotify bool) (*Tail, error) {
t := &Tail{
filename,
make(chan *Line),
useinotify,
maxlinesize,
nil,
nil,
watcher,
nil,
make(chan bool),
make(chan bool)}
if !useinotify {
log.Println("Warning: not using inotify; will poll ", filename)
t.watcher = NewPollingFileWatcher(filename)
} else {
t.watcher = NewInotifyFileWatcher(filename)
}
go t.tailFileSync(end, retry)
return t, nil
@ -59,7 +62,6 @@ func (tail *Tail) Stop() {
func (tail *Tail) close() {
close(tail.Lines)
tail.watcher.Close()
if tail.file != nil {
tail.file.Close()
}
@ -74,16 +76,15 @@ func (tail *Tail) reopen(wait bool) {
tail.file, err = os.Open(tail.Filename)
if err != nil {
if os.IsNotExist(err) && wait {
for {
evt := <-tail.watcher.Event
if evt.Name == tail.Filename {
break
}
log.Println("blocking until exists")
err := tail.watcher.BlockUntilExists()
if err != nil {
panic(err)
}
log.Println("exists now")
continue
}
// TODO: don't panic here
panic(fmt.Sprintf("can't open file: %s", err))
log.Println(fmt.Sprintf("Unable to reopen file (%s): %s", tail.Filename, err))
}
return
}
@ -109,6 +110,8 @@ func (tail *Tail) readLine() ([]byte, error) {
func (tail *Tail) tailFileSync(end bool, retry bool) {
tail.reopen(retry)
var changes chan bool
// Note: seeking to end happens only at the beginning; never
// during subsequent re-opens.
if end {
@ -121,70 +124,60 @@ func (tail *Tail) tailFileSync(end bool, retry bool) {
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
every2Seconds := time.Tick(2 * time.Second)
for {
line, err := tail.readLine()
if err != nil && err != io.EOF {
log.Println("Error reading file; skipping this file - ", err)
tail.close()
return
}
if err == nil {
if line != nil {
tail.Lines <- &Line{string(line), getCurrentTime()}
}
} else {
if err != io.EOF {
log.Println("Error reading file; skipping this file - ", err)
tail.close()
return
}
// sleep for 0.1s on inactive files, else we cause too much I/O activity
if err == io.EOF {
time.Sleep(100 * time.Millisecond)
}
// When end of file is reached, wait for more data to
// become available. Wait strategy is based on the
// `tail.watcher` implementation (inotify or polling).
if err == io.EOF {
if changes == nil {
changes = tail.watcher.ChangeEvents()
}
if line != nil {
tail.Lines <- &Line{string(line), getCurrentTime()}
}
//log.Println("WAITING ", tail.Filename)
_, ok := <-changes
//log.Println("RECEIVED ", tail.Filename)
select {
case <-every2Seconds: // periodically stat the file to check for possibly deletion.
if _, err := tail.file.Stat(); os.IsNotExist(err) {
if retry {
log.Printf("File %s has gone away; attempting to reopen it.\n", tail.Filename)
tail.reopen(retry)
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
continue
} else {
log.Printf("File %s has gone away; skipping this file.\n", tail.Filename)
tail.close()
return
if !ok {
// file got deleted/renamed
if retry {
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename)
tail.reopen(retry)
log.Printf("File %s has been reopened.", tail.Filename)
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
changes = nil
continue
} else {
log.Printf("File %s has gone away; skipping this file.\n", tail.Filename)
tail.close()
return
}
}
}
case evt := <-tail.watcher.Event:
if evt.Name == tail.Filename {
log.Printf("File %s has been moved (logrotation?); reopening..", tail.Filename)
tail.reopen(retry)
tail.reader = bufio.NewReaderSize(tail.file, tail.maxlinesize)
continue
}
case <-tail.stop: // stop the tailer if requested
}
// stop the tailer if requested.
// FIXME: won't happen promptly; http://bugs.activestate.com/show_bug.cgi?id=95718#c3
select {
case <-tail.stop:
return
default:
}
}
}
// returns the watcher for file create events
func fileCreateWatcher(filename string) (*fsnotify.Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
// watch on parent directory because the file may not exit.
err = watcher.WatchFlags(filepath.Dir(filename), fsnotify.FSN_CREATE)
if err != nil {
return nil, err
}
return watcher, nil
}
// get current time in unix timestamp
func getCurrentTime() int64 {
return time.Now().UTC().Unix()

131
watch.go Normal file
View File

@ -0,0 +1,131 @@
// TODO: avoid creating two instances of the fsnotify.Watcher struct
package tail
import (
"github.com/howeyc/fsnotify"
"os"
"path/filepath"
"time"
)
type FileWatcher interface {
BlockUntilExists() error
ChangeEvents() chan bool
}
// FileWatcher monitors file-level events
type InotifyFileWatcher struct {
Filename string
}
func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
fw := &InotifyFileWatcher{filename}
return fw
}
// BlockUntilExists blocks until the file comes into existence. If the
// file already exists, then block until it is created again.
func (fw *InotifyFileWatcher) BlockUntilExists() error {
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
err = w.WatchFlags(filepath.Dir(fw.Filename), fsnotify.FSN_CREATE)
if err != nil {
return err
}
<-w.Event
w.RemoveWatch(filepath.Dir(fw.Filename))
// XXX: how to free up w's goroutines without relying on the gc?
return nil
}
// ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *InotifyFileWatcher) ChangeEvents() chan bool {
w, err := fsnotify.NewWatcher()
if err != nil {
panic(err)
}
err = w.Watch(fw.Filename)
if err != nil {
panic(err)
}
ch := make(chan bool)
go func() {
for {
evt := <-w.Event
switch {
case evt.IsDelete():
fallthrough
case evt.IsRename():
close(ch)
w.RemoveWatch(fw.Filename)
return
case evt.IsModify():
// send only if channel is empty.
select {
case ch <- true:
default:
}
}
}
}()
return ch
}
// FileWatcher monitors file-level events
type PollingFileWatcher struct {
Filename string
}
func NewPollingFileWatcher(filename string) *PollingFileWatcher {
fw := &PollingFileWatcher{filename}
return fw
}
// BlockUntilExists blocks until the file comes into existence. If the
// file already exists, then block until it is created again.
func (fw *PollingFileWatcher) BlockUntilExists() error {
panic("not implemented")
return nil
}
// ChangeEvents returns a channel that gets updated when the file is ready to be read.
func (fw *PollingFileWatcher) ChangeEvents() chan bool {
ch := make(chan bool)
stop := make(chan bool)
every2Seconds := time.Tick(2 * time.Second)
go func() {
for {
time.Sleep(250 * time.Millisecond)
select {
case ch <- true:
case <-stop:
return
default:
}
}
}()
go func() {
for {
select {
case <-every2Seconds:
// NOTE: not using file descriptor.
if _, err := os.Stat(fw.Filename); os.IsNotExist(err) {
stop <- true
close(ch)
return
}
}
}
}()
return ch
}