Merge pull request #30 from ActiveState/odd_line_splitting

Fix odd line splitting
This commit is contained in:
Sridhar Ratnakumar 2014-05-20 17:49:00 -07:00
commit c5d5673bde
3 changed files with 49 additions and 45 deletions

View File

@ -12,7 +12,9 @@ import (
func args2config() (tail.Config, int64) { func args2config() (tail.Config, int64) {
config := tail.Config{Follow: true} config := tail.Config{Follow: true}
n := int64(0) n := int64(0)
maxlinesize := int(0)
flag.Int64Var(&n, "n", 0, "tail from the last Nth location") flag.Int64Var(&n, "n", 0, "tail from the last Nth location")
flag.IntVar(&maxlinesize, "max", 0, "max line size")
flag.BoolVar(&config.Follow, "f", false, "wait for additional data to be appended to the file") flag.BoolVar(&config.Follow, "f", false, "wait for additional data to be appended to the file")
flag.BoolVar(&config.ReOpen, "F", false, "follow, and track file rename/rotation") flag.BoolVar(&config.ReOpen, "F", false, "follow, and track file rename/rotation")
flag.BoolVar(&config.Poll, "p", false, "use polling, instead of inotify") flag.BoolVar(&config.Poll, "p", false, "use polling, instead of inotify")
@ -20,6 +22,7 @@ func args2config() (tail.Config, int64) {
if config.ReOpen { if config.ReOpen {
config.Follow = true config.Follow = true
} }
config.MaxLineSize = maxlinesize
return config, n return config, n
} }

83
tail.go
View File

@ -13,6 +13,7 @@ import (
"launchpad.net/tomb" "launchpad.net/tomb"
"log" "log"
"os" "os"
"strings"
"time" "time"
) )
@ -167,18 +168,18 @@ func (tail *Tail) reopen() error {
return nil return nil
} }
func (tail *Tail) readLine() ([]byte, error) { func (tail *Tail) readLine() (string, error) {
line, isPrefix, err := tail.reader.ReadLine() line, err := tail.reader.ReadString('\n')
if !isPrefix || tail.MaxLineSize > 0 { if err != nil {
// Note ReadString "returns the data read before the error" in
// case of an error, including EOF, so we return it as is. The
// caller is expected to process it if err is EOF.
return line, err return line, err
} }
buf := append([]byte(nil), line...) line = strings.TrimRight(line, "\n")
for isPrefix && err == nil {
line, isPrefix, err = tail.reader.ReadLine() return line, err
buf = append(buf, line...)
}
return buf, err
} }
func (tail *Tail) tailFileSync() { func (tail *Tail) tailFileSync() {
@ -206,36 +207,34 @@ func (tail *Tail) tailFileSync() {
} }
} }
tail.reader = tail.newReader() tail.openReader()
// Read line by line. // Read line by line.
for { for {
line, err := tail.readLine() line, err := tail.readLine()
switch err { // Process `line` even if err is EOF.
case nil: if err == nil || (err == io.EOF && line != "") {
if line != nil { cooloff := !tail.sendLine(line)
cooloff := !tail.sendLine(line) if cooloff {
if cooloff { // Wait a second before seeking till the end of
// Wait a second before seeking till the end of // file when rate limit is reached.
// file when rate limit is reached. msg := fmt.Sprintf(
msg := fmt.Sprintf( "Too much log activity; waiting a second " +
"Too much log activity; waiting a second " + "before resuming tailing")
"before resuming tailing") tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)} select {
select { case <-time.After(time.Second):
case <-time.After(time.Second): case <-tail.Dying():
case <-tail.Dying(): return
return }
} err = tail.seekEnd()
err = tail.seekEnd() if err != nil {
if err != nil { tail.Kill(err)
tail.Kill(err) return
return
}
} }
} }
case io.EOF: } else if err == io.EOF {
if !tail.Follow { if !tail.Follow {
return return
} }
@ -249,7 +248,8 @@ func (tail *Tail) tailFileSync() {
} }
return return
} }
default: // non-EOF error } else {
// non-EOF error
tail.Killf("Error reading %s: %s", tail.Filename, err) tail.Killf("Error reading %s: %s", tail.Filename, err)
return return
} }
@ -286,7 +286,7 @@ func (tail *Tail) waitForChanges() error {
return err return err
} }
tail.Logger.Printf("Successfully reopened %s", tail.Filename) tail.Logger.Printf("Successfully reopened %s", tail.Filename)
tail.reader = tail.newReader() tail.openReader()
return nil return nil
} else { } else {
tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
@ -299,7 +299,7 @@ func (tail *Tail) waitForChanges() error {
return err return err
} }
tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename) tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
tail.reader = tail.newReader() tail.openReader()
return nil return nil
case <-tail.Dying(): case <-tail.Dying():
return ErrStop return ErrStop
@ -307,12 +307,12 @@ func (tail *Tail) waitForChanges() error {
panic("unreachable") panic("unreachable")
} }
func (tail *Tail) newReader() *bufio.Reader { func (tail *Tail) openReader() {
if tail.MaxLineSize > 0 { if tail.MaxLineSize > 0 {
// add 2 to account for newline characters // add 2 to account for newline characters
return bufio.NewReaderSize(tail.file, tail.MaxLineSize+2) tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
} else { } else {
return bufio.NewReader(tail.file) tail.reader = bufio.NewReader(tail.file)
} }
} }
@ -328,14 +328,13 @@ func (tail *Tail) seekEnd() error {
// sendLine sends the line(s) to Lines channel, splitting longer lines // sendLine sends the line(s) to Lines channel, splitting longer lines
// if necessary. Return false if rate limit is reached. // if necessary. Return false if rate limit is reached.
func (tail *Tail) sendLine(line []byte) bool { func (tail *Tail) sendLine(line string) bool {
now := time.Now() now := time.Now()
lines := []string{string(line)} lines := []string{line}
// Split longer lines // Split longer lines
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
lines = util.PartitionString( lines = util.PartitionString(line, tail.MaxLineSize)
string(line), tail.MaxLineSize)
} }
for _, line := range lines { for _, line := range lines {

View File

@ -85,7 +85,7 @@ func TestOver4096ByteLine(_t *testing.T) {
func TestOver4096ByteLineWithSetMaxLineSize(_t *testing.T) { func TestOver4096ByteLineWithSetMaxLineSize(_t *testing.T) {
t := NewTailTest("Over4096ByteLineMaxLineSize", _t) t := NewTailTest("Over4096ByteLineMaxLineSize", _t)
testString := strings.Repeat("a", 4097) testString := strings.Repeat("a", 4097)
t.CreateFile("test.txt", "test\r\n"+testString+"\r\nhello\r\nworld\r\n") t.CreateFile("test.txt", "test\n"+testString+"\nhello\nworld\n")
tail := t.StartTail("test.txt", Config{Follow: true, Location: nil, MaxLineSize: 4097}) tail := t.StartTail("test.txt", Config{Follow: true, Location: nil, MaxLineSize: 4097})
go t.VerifyTailOutput(tail, []string{"test", testString, "hello", "world"}) go t.VerifyTailOutput(tail, []string{"test", testString, "hello", "world"})
@ -93,7 +93,7 @@ func TestOver4096ByteLineWithSetMaxLineSize(_t *testing.T) {
// to read all lines. // to read all lines.
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
t.RemoveFile("test.txt") t.RemoveFile("test.txt")
tail.Stop() // tail.Stop()
Cleanup() Cleanup()
} }
@ -415,7 +415,9 @@ func (t TailTest) VerifyTailOutput(tail *Tail, lines []string) {
// Note: not checking .Err as the `lines` argument is designed // Note: not checking .Err as the `lines` argument is designed
// to match error strings as well. // to match error strings as well.
if tailedLine.Text != line { if tailedLine.Text != line {
t.Fatalf("unexpected line/err from tail: expecting ```%s```, but got ```%s```", t.Fatalf(
"unexpected line/err from tail: "+
"expecting <<%s>>>, but got <<<%s>>>",
line, tailedLine.Text) line, tailedLine.Text)
} }
} }