From 34fb5fd3efa2bfda92b0a0911f77318e9c140562 Mon Sep 17 00:00:00 2001 From: Sridhar Ratnakumar Date: Tue, 29 Apr 2014 16:39:57 -0700 Subject: [PATCH] integrate leakybucket algorithm --- rate.go | 20 -------------------- tail.go | 31 ++++++++++++++----------------- tail_test.go | 15 +++++++++------ 3 files changed, 23 insertions(+), 43 deletions(-) delete mode 100644 rate.go diff --git a/rate.go b/rate.go deleted file mode 100644 index ab639d0..0000000 --- a/rate.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package tail - -// RateMonitor is a naive rate monitor that monitors the number of -// items processed in the current second. -type RateMonitor struct { - second int64 - num int64 -} - -func (r *RateMonitor) Tick(unixTime int64) int64 { - if r.second != unixTime { - r.second = unixTime - r.num = 1 - } else { - r.num += 1 - } - return r.num -} diff --git a/tail.go b/tail.go index 482ca88..babc269 100644 --- a/tail.go +++ b/tail.go @@ -5,6 +5,7 @@ package tail import ( "bufio" "fmt" + "github.com/ActiveState/tail/ratelimiter" "github.com/ActiveState/tail/util" "github.com/ActiveState/tail/watch" "io" @@ -39,11 +40,11 @@ type SeekInfo struct { // Config is used to specify how a file must be tailed. type Config struct { // File-specifc - Location *SeekInfo // Seek to this location before tailing - ReOpen bool // Reopen recreated files (tail -F) - MustExist bool // Fail early if the file does not exist - Poll bool // Poll for file changes instead of using inotify - LimitRate int64 // Maximum read rate (lines per second) + Location *SeekInfo // Seek to this location before tailing + ReOpen bool // Reopen recreated files (tail -F) + MustExist bool // Fail early if the file does not exist + Poll bool // Poll for file changes instead of using inotify + RateLimiter *ratelimiter.LeakyBucket // Generic IO Follow bool // Continue looking for new lines (tail -f) @@ -63,7 +64,6 @@ type Tail struct { reader *bufio.Reader watcher watch.FileWatcher changes *watch.FileChanges - rateMon *RateMonitor tomb.Tomb // provides: Done, Kill, Dying } @@ -95,8 +95,6 @@ func TailFile(filename string, config Config) (*Tail, error) { t.Logger = log.New(os.Stderr, "", log.LstdFlags) } - t.rateMon = new(RateMonitor) - if t.Poll { t.watcher = watch.NewPollingFileWatcher(filename) } else { @@ -222,9 +220,8 @@ func (tail *Tail) tailFileSync() { // Wait a second before seeking till the end of // file when rate limit is reached. msg := fmt.Sprintf( - "Too much log activity (more than %d lines "+ - "per second being written); waiting a second "+ - "before resuming tailing", tail.LimitRate) + "Too much log activity; waiting a second " + + "before resuming tailing") tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)} select { case <-time.After(time.Second): @@ -333,7 +330,6 @@ func (tail *Tail) seekEnd() error { // if necessary. Return false if rate limit is reached. func (tail *Tail) sendLine(line []byte) bool { now := time.Now() - nowUnix := now.Unix() lines := []string{string(line)} // Split longer lines @@ -344,11 +340,12 @@ func (tail *Tail) sendLine(line []byte) bool { for _, line := range lines { tail.Lines <- &Line{line, now, nil} - rate := tail.rateMon.Tick(nowUnix) - if tail.LimitRate > 0 && rate > tail.LimitRate { - tail.Logger.Printf("Rate limit (%v < %v) reached on file (%v); entering 1s cooloff period.\n", - tail.LimitRate, - rate, + } + + if tail.Config.RateLimiter != nil { + ok := tail.Config.RateLimiter.Pour(uint16(len(lines))) + if !ok { + tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n", tail.Filename) return false } diff --git a/tail_test.go b/tail_test.go index 58b340a..c6ad712 100644 --- a/tail_test.go +++ b/tail_test.go @@ -8,6 +8,7 @@ package tail import ( "./watch" _ "fmt" + "github.com/ActiveState/tail/ratelimiter" "io/ioutil" "os" "strings" @@ -261,15 +262,17 @@ func TestRateLimiting(_t *testing.T) { t := NewTailTest("rate-limiting", _t) t.CreateFile("test.txt", "hello\nworld\nagain\nextra\n") config := Config{ - Follow: true, - LimitRate: 2} - expecting := "Too much log activity (more than 2 lines per second being written); waiting a second before resuming tailing" + Follow: true, + RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second)} + leakybucketFull := "Too much log activity; waiting a second before resuming tailing" tail := t.StartTail("test.txt", config) // TODO: also verify that tail resumes after the cooloff period. - go t.VerifyTailOutput( - tail, - []string{"hello", "world", "again", expecting, "more", "data"}) + go t.VerifyTailOutput(tail, []string{ + "hello", "world", "again", + leakybucketFull, + "more", "data", + leakybucketFull}) // Add more data only after reasonable delay. <-time.After(1200 * time.Millisecond)