Merge pull request #10 from ActiveState/rate_limiting

option to limit the rate of reading lines from files
This commit is contained in:
Sridhar Ratnakumar 2013-07-11 18:35:26 -07:00
commit 1faae64bdb
3 changed files with 87 additions and 12 deletions

18
rate.go Normal file
View File

@ -0,0 +1,18 @@
package tail
// RateMonitor is a naive rate monitor that monitors the number of
// items processed in the current second.
type RateMonitor struct {
second int64
num int64
}
func (r *RateMonitor) Tick(unixTime int64) int64 {
if r.second != unixTime {
r.second = unixTime
r.num = 1
} else {
r.num += 1
}
return r.num
}

58
tail.go
View File

@ -20,16 +20,20 @@ var (
type Line struct { type Line struct {
Text string Text string
Time time.Time Time time.Time
Err error // If non-nil, this is an error from tail, and not a
// log line itself.
} }
// Config is used to specify how a file must be tailed. // Config is used to specify how a file must be tailed.
type Config struct { type Config struct {
Location int // Tail from last N lines (tail -n) Location int // Tail from last N lines (tail -n)
Follow bool // Continue looking for new lines (tail -f) Follow bool // Continue looking for new lines (tail -f)
ReOpen bool // Reopen recreated files (tail -F) ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify Poll bool // Poll for file changes instead of using inotify
MaxLineSize int // If non-zero, split longer lines into multiple lines MaxLineSize int // If non-zero, split longer lines into multiple lines
LimitRate int64 // If non-zero, limit the rate of read log lines
// by this much per second.
} }
type Tail struct { type Tail struct {
@ -41,6 +45,7 @@ type Tail struct {
reader *bufio.Reader reader *bufio.Reader
watcher watch.FileWatcher watcher watch.FileWatcher
changes *watch.FileChanges changes *watch.FileChanges
rateMon *RateMonitor
tomb.Tomb // provides: Done, Kill, Dying tomb.Tomb // provides: Done, Kill, Dying
} }
@ -63,6 +68,8 @@ func TailFile(filename string, config Config) (*Tail, error) {
Lines: make(chan *Line), Lines: make(chan *Line),
Config: config} Config: config}
t.rateMon = new(RateMonitor)
if t.Poll { if t.Poll {
t.watcher = watch.NewPollingFileWatcher(filename) t.watcher = watch.NewPollingFileWatcher(filename)
} else { } else {
@ -153,7 +160,26 @@ func (tail *Tail) tailFileSync() {
switch err { switch err {
case nil: case nil:
if line != nil { if line != nil {
tail.sendLine(line) cooloff := !tail.sendLine(line)
if cooloff {
msg := "Too much activity; entering a cool-off period"
tail.Lines <- &Line{
msg,
time.Now(),
fmt.Errorf(msg)}
// Wait a second before seeking till the end of
// file when rate limit is reached.
select {
case <-time.After(time.Second):
case <-tail.Dying():
return
}
_, err := tail.file.Seek(0, 2) // Seek to fine end
if err != nil {
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
}
} }
case io.EOF: case io.EOF:
if !tail.Follow { if !tail.Follow {
@ -228,21 +254,31 @@ func (tail *Tail) waitForChanges() error {
} }
// sendLine sends the line(s) to Lines channel, splitting longer lines // sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary. // if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line []byte) { func (tail *Tail) sendLine(line []byte) bool {
now := time.Now() now := time.Now()
nowUnix := now.Unix()
lines := []string{string(line)} lines := []string{string(line)}
// Split longer lins // Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = partitionString( lines = partitionString(
string(line), tail.MaxLineSize) string(line), tail.MaxLineSize)
} }
for _, line := range lines { for _, line := range lines {
tail.Lines <- &Line{line, now} tail.Lines <- &Line{line, now, nil}
rate := tail.rateMon.Tick(nowUnix)
if tail.LimitRate > 0 && rate > tail.LimitRate {
log.Printf("Rate limit (%v < %v) reached file (%v); entering 1s cooloff period.\n",
tail.LimitRate,
rate,
tail.Filename)
return false
}
} }
return true
} }
// partitionString partitions the string into chunks of given size, // partitionString partitions the string into chunks of given size,

View File

@ -188,6 +188,27 @@ func TestReSeekPolling(_t *testing.T) {
_TestReSeek(_t, true) _TestReSeek(_t, true)
} }
func TestRateLimiting(_t *testing.T) {
t := NewTailTest("rate-limiting", _t)
t.CreateFile("test.txt", "hello\nworld\nagain\n")
config := Config{
Follow: true,
Location: -1,
LimitRate: 2}
tail := t.StartTail("test.txt", config)
// TODO: also verify that tail resumes after the cooloff period.
go t.VerifyTailOutput(
tail, []string{
"hello", "world", "again",
"Too much activity; entering a cool-off period"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
}
// Test library // Test library
type TailTest struct { type TailTest struct {
@ -285,6 +306,6 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
} }
line, ok := <-tail.Lines line, ok := <-tail.Lines
if ok { if ok {
t.Fatalf("more content from tail: %s", line.Text) t.Fatalf("more content from tail: %+v", line)
} }
} }