diff --git a/rate.go b/rate.go new file mode 100644 index 0000000..447133e --- /dev/null +++ b/rate.go @@ -0,0 +1,18 @@ +package tail + +// RateMonitor is a naive rate monitor that monitors the number of +// items processed in the current second. +type RateMonitor struct { + second int64 + num int64 +} + +func (r *RateMonitor) Tick(unixTime int64) int64 { + if r.second != unixTime { + r.second = unixTime + r.num = 1 + } else { + r.num += 1 + } + return r.num +} diff --git a/tail.go b/tail.go index 1966c7f..5b49791 100644 --- a/tail.go +++ b/tail.go @@ -20,16 +20,20 @@ var ( type Line struct { Text string Time time.Time + Err error // If non-nil, this is an error from tail, and not a + // log line itself. } // Config is used to specify how a file must be tailed. type Config struct { - Location int // Tail from last N lines (tail -n) - Follow bool // Continue looking for new lines (tail -f) - ReOpen bool // Reopen recreated files (tail -F) - MustExist bool // Fail early if the file does not exist - Poll bool // Poll for file changes instead of using inotify - MaxLineSize int // If non-zero, split longer lines into multiple lines + Location int // Tail from last N lines (tail -n) + Follow bool // Continue looking for new lines (tail -f) + ReOpen bool // Reopen recreated files (tail -F) + MustExist bool // Fail early if the file does not exist + Poll bool // Poll for file changes instead of using inotify + MaxLineSize int // If non-zero, split longer lines into multiple lines + LimitRate int64 // If non-zero, limit the rate of read log lines + // by this much per second. } type Tail struct { @@ -41,6 +45,7 @@ type Tail struct { reader *bufio.Reader watcher watch.FileWatcher changes *watch.FileChanges + rateMon *RateMonitor tomb.Tomb // provides: Done, Kill, Dying } @@ -63,6 +68,8 @@ func TailFile(filename string, config Config) (*Tail, error) { Lines: make(chan *Line), Config: config} + t.rateMon = new(RateMonitor) + if t.Poll { t.watcher = watch.NewPollingFileWatcher(filename) } else { @@ -153,7 +160,26 @@ func (tail *Tail) tailFileSync() { switch err { case nil: if line != nil { - tail.sendLine(line) + cooloff := !tail.sendLine(line) + if cooloff { + msg := "Too much activity; entering a cool-off period" + tail.Lines <- &Line{ + msg, + time.Now(), + fmt.Errorf(msg)} + // Wait a second before seeking till the end of + // file when rate limit is reached. + select { + case <-time.After(time.Second): + case <-tail.Dying(): + return + } + _, err := tail.file.Seek(0, 2) // Seek to fine end + if err != nil { + tail.Killf("Seek error on %s: %s", tail.Filename, err) + return + } + } } case io.EOF: if !tail.Follow { @@ -228,21 +254,31 @@ func (tail *Tail) waitForChanges() error { } // sendLine sends the line(s) to Lines channel, splitting longer lines -// if necessary. -func (tail *Tail) sendLine(line []byte) { +// if necessary. Return false if rate limit is reached. +func (tail *Tail) sendLine(line []byte) bool { now := time.Now() + nowUnix := now.Unix() lines := []string{string(line)} - // Split longer lins + // Split longer lines if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { lines = partitionString( string(line), tail.MaxLineSize) } for _, line := range lines { - tail.Lines <- &Line{line, now} + tail.Lines <- &Line{line, now, nil} + rate := tail.rateMon.Tick(nowUnix) + if tail.LimitRate > 0 && rate > tail.LimitRate { + log.Printf("Rate limit (%v < %v) reached file (%v); entering 1s cooloff period.\n", + tail.LimitRate, + rate, + tail.Filename) + return false + } } + return true } // partitionString partitions the string into chunks of given size, diff --git a/tail_test.go b/tail_test.go index 888d180..3bb9d20 100644 --- a/tail_test.go +++ b/tail_test.go @@ -188,6 +188,27 @@ func TestReSeekPolling(_t *testing.T) { _TestReSeek(_t, true) } +func TestRateLimiting(_t *testing.T) { + t := NewTailTest("rate-limiting", _t) + t.CreateFile("test.txt", "hello\nworld\nagain\n") + config := Config{ + Follow: true, + Location: -1, + LimitRate: 2} + tail := t.StartTail("test.txt", config) + // TODO: also verify that tail resumes after the cooloff period. + go t.VerifyTailOutput( + tail, []string{ + "hello", "world", "again", + "Too much activity; entering a cool-off period"}) + + // Delete after a reasonable delay, to give tail sufficient time + // to read all lines. + <-time.After(100 * time.Millisecond) + t.RemoveFile("test.txt") + tail.Stop() +} + // Test library type TailTest struct { @@ -285,6 +306,6 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) { } line, ok := <-tail.Lines if ok { - t.Fatalf("more content from tail: %s", line.Text) + t.Fatalf("more content from tail: %+v", line) } }