option to limit the rate of reading lines from files

This commit is contained in:
Sridhar Ratnakumar 2013-07-11 17:28:18 -07:00
parent f461ddc97d
commit b2509e165e
3 changed files with 87 additions and 12 deletions

18
rate.go Normal file
View File

@ -0,0 +1,18 @@
package tail
// RateMonitor is a naive rate monitor that monitors the number of
// items processed in the current second.
type RateMonitor struct {
second int64
num int64
}
func (r *RateMonitor) Tick(unixTime int64) int64 {
if r.second != unixTime {
r.second = unixTime
r.num = 1
} else {
r.num += 1
}
return r.num
}

46
tail.go
View File

@ -20,6 +20,8 @@ var (
type Line struct {
Text string
Time time.Time
Err error // If non-nil, this is an error from tail, and not a
// log line itself.
}
// Config is used to specify how a file must be tailed.
@ -30,6 +32,8 @@ type Config struct {
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify
MaxLineSize int // If non-zero, split longer lines into multiple lines
LimitRate int64 // If non-zero, limit the rate of read log lines
// by this much per second.
}
type Tail struct {
@ -41,6 +45,7 @@ type Tail struct {
reader *bufio.Reader
watcher watch.FileWatcher
changes *watch.FileChanges
rateMon *RateMonitor
tomb.Tomb // provides: Done, Kill, Dying
}
@ -63,6 +68,8 @@ func TailFile(filename string, config Config) (*Tail, error) {
Lines: make(chan *Line),
Config: config}
t.rateMon = new(RateMonitor)
if t.Poll {
t.watcher = watch.NewPollingFileWatcher(filename)
} else {
@ -153,7 +160,26 @@ func (tail *Tail) tailFileSync() {
switch err {
case nil:
if line != nil {
tail.sendLine(line)
cooloff := !tail.sendLine(line)
if cooloff {
msg := "Too much activity; entering a cool-off period"
tail.Lines <- &Line{
msg,
time.Now(),
fmt.Errorf(msg)}
// Wait a second before seeking till the end of
// file when rate limit is reached.
select {
case <-time.After(time.Second):
case <-tail.Dying():
return
}
_, err := tail.file.Seek(0, 2) // Seek to fine end
if err != nil {
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
}
}
case io.EOF:
if !tail.Follow {
@ -228,21 +254,31 @@ func (tail *Tail) waitForChanges() error {
}
// sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary.
func (tail *Tail) sendLine(line []byte) {
// if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line []byte) bool {
now := time.Now()
nowUnix := now.Unix()
lines := []string{string(line)}
// Split longer lins
// Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = partitionString(
string(line), tail.MaxLineSize)
}
for _, line := range lines {
tail.Lines <- &Line{line, now}
tail.Lines <- &Line{line, now, nil}
rate := tail.rateMon.Tick(nowUnix)
if tail.LimitRate > 0 && rate > tail.LimitRate {
log.Printf("Rate limit (%v < %v) reached file (%v); entering 1s cooloff period.\n",
tail.LimitRate,
rate,
tail.Filename)
return false
}
}
return true
}
// partitionString partitions the string into chunks of given size,

View File

@ -188,6 +188,27 @@ func TestReSeekPolling(_t *testing.T) {
_TestReSeek(_t, true)
}
func TestRateLimiting(_t *testing.T) {
t := NewTailTest("rate-limiting", _t)
t.CreateFile("test.txt", "hello\nworld\nagain\n")
config := Config{
Follow: true,
Location: -1,
LimitRate: 2}
tail := t.StartTail("test.txt", config)
// TODO: also verify that tail resumes after the cooloff period.
go t.VerifyTailOutput(
tail, []string{
"hello", "world", "again",
"Too much activity; entering a cool-off period"})
// Delete after a reasonable delay, to give tail sufficient time
// to read all lines.
<-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt")
tail.Stop()
}
// Test library
type TailTest struct {
@ -285,6 +306,6 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
}
line, ok := <-tail.Lines
if ok {
t.Fatalf("more content from tail: %s", line.Text)
t.Fatalf("more content from tail: %+v", line)
}
}