integrate leakybucket algorithm

This commit is contained in:
Sridhar Ratnakumar 2014-04-29 16:39:57 -07:00
parent a67a74158f
commit 34fb5fd3ef
3 changed files with 23 additions and 43 deletions

20
rate.go
View File

@ -1,20 +0,0 @@
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package tail
// RateMonitor is a naive rate monitor that monitors the number of
// items processed in the current second.
type RateMonitor struct {
second int64
num int64
}
func (r *RateMonitor) Tick(unixTime int64) int64 {
if r.second != unixTime {
r.second = unixTime
r.num = 1
} else {
r.num += 1
}
return r.num
}

31
tail.go
View File

@ -5,6 +5,7 @@ package tail
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/ActiveState/tail/ratelimiter"
"github.com/ActiveState/tail/util" "github.com/ActiveState/tail/util"
"github.com/ActiveState/tail/watch" "github.com/ActiveState/tail/watch"
"io" "io"
@ -39,11 +40,11 @@ type SeekInfo struct {
// Config is used to specify how a file must be tailed. // Config is used to specify how a file must be tailed.
type Config struct { type Config struct {
// File-specifc // File-specifc
Location *SeekInfo // Seek to this location before tailing Location *SeekInfo // Seek to this location before tailing
ReOpen bool // Reopen recreated files (tail -F) ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify Poll bool // Poll for file changes instead of using inotify
LimitRate int64 // Maximum read rate (lines per second) RateLimiter *ratelimiter.LeakyBucket
// Generic IO // Generic IO
Follow bool // Continue looking for new lines (tail -f) Follow bool // Continue looking for new lines (tail -f)
@ -63,7 +64,6 @@ type Tail struct {
reader *bufio.Reader reader *bufio.Reader
watcher watch.FileWatcher watcher watch.FileWatcher
changes *watch.FileChanges changes *watch.FileChanges
rateMon *RateMonitor
tomb.Tomb // provides: Done, Kill, Dying tomb.Tomb // provides: Done, Kill, Dying
} }
@ -95,8 +95,6 @@ func TailFile(filename string, config Config) (*Tail, error) {
t.Logger = log.New(os.Stderr, "", log.LstdFlags) t.Logger = log.New(os.Stderr, "", log.LstdFlags)
} }
t.rateMon = new(RateMonitor)
if t.Poll { if t.Poll {
t.watcher = watch.NewPollingFileWatcher(filename) t.watcher = watch.NewPollingFileWatcher(filename)
} else { } else {
@ -222,9 +220,8 @@ func (tail *Tail) tailFileSync() {
// Wait a second before seeking till the end of // Wait a second before seeking till the end of
// file when rate limit is reached. // file when rate limit is reached.
msg := fmt.Sprintf( msg := fmt.Sprintf(
"Too much log activity (more than %d lines "+ "Too much log activity; waiting a second " +
"per second being written); waiting a second "+ "before resuming tailing")
"before resuming tailing", tail.LimitRate)
tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)} tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
select { select {
case <-time.After(time.Second): case <-time.After(time.Second):
@ -333,7 +330,6 @@ func (tail *Tail) seekEnd() error {
// if necessary. Return false if rate limit is reached. // if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line []byte) bool { func (tail *Tail) sendLine(line []byte) bool {
now := time.Now() now := time.Now()
nowUnix := now.Unix()
lines := []string{string(line)} lines := []string{string(line)}
// Split longer lines // Split longer lines
@ -344,11 +340,12 @@ func (tail *Tail) sendLine(line []byte) bool {
for _, line := range lines { for _, line := range lines {
tail.Lines <- &Line{line, now, nil} tail.Lines <- &Line{line, now, nil}
rate := tail.rateMon.Tick(nowUnix) }
if tail.LimitRate > 0 && rate > tail.LimitRate {
tail.Logger.Printf("Rate limit (%v < %v) reached on file (%v); entering 1s cooloff period.\n", if tail.Config.RateLimiter != nil {
tail.LimitRate, ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
rate, if !ok {
tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
tail.Filename) tail.Filename)
return false return false
} }

View File

@ -8,6 +8,7 @@ package tail
import ( import (
"./watch" "./watch"
_ "fmt" _ "fmt"
"github.com/ActiveState/tail/ratelimiter"
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
@ -261,15 +262,17 @@ func TestRateLimiting(_t *testing.T) {
t := NewTailTest("rate-limiting", _t) t := NewTailTest("rate-limiting", _t)
t.CreateFile("test.txt", "hello\nworld\nagain\nextra\n") t.CreateFile("test.txt", "hello\nworld\nagain\nextra\n")
config := Config{ config := Config{
Follow: true, Follow: true,
LimitRate: 2} RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second)}
expecting := "Too much log activity (more than 2 lines per second being written); waiting a second before resuming tailing" leakybucketFull := "Too much log activity; waiting a second before resuming tailing"
tail := t.StartTail("test.txt", config) tail := t.StartTail("test.txt", config)
// TODO: also verify that tail resumes after the cooloff period. // TODO: also verify that tail resumes after the cooloff period.
go t.VerifyTailOutput( go t.VerifyTailOutput(tail, []string{
tail, "hello", "world", "again",
[]string{"hello", "world", "again", expecting, "more", "data"}) leakybucketFull,
"more", "data",
leakybucketFull})
// Add more data only after reasonable delay. // Add more data only after reasonable delay.
<-time.After(1200 * time.Millisecond) <-time.After(1200 * time.Millisecond)