fix data race
This commit is contained in:
parent
2bc904e34f
commit
274567a526
12
tail.go
12
tail.go
|
@ -10,6 +10,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hpcloud/tail/ratelimiter"
|
"github.com/hpcloud/tail/ratelimiter"
|
||||||
|
@ -82,6 +83,8 @@ type Tail struct {
|
||||||
changes *watch.FileChanges
|
changes *watch.FileChanges
|
||||||
|
|
||||||
tomb.Tomb // provides: Done, Kill, Dying
|
tomb.Tomb // provides: Done, Kill, Dying
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -140,7 +143,9 @@ func (tail *Tail) Tell() (offset int64, err error) {
|
||||||
}
|
}
|
||||||
offset, err = tail.file.Seek(0, os.SEEK_CUR)
|
offset, err = tail.file.Seek(0, os.SEEK_CUR)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
tail.lk.Lock()
|
||||||
offset -= int64(tail.reader.Buffered())
|
offset -= int64(tail.reader.Buffered())
|
||||||
|
tail.lk.Unlock()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -187,7 +192,9 @@ func (tail *Tail) reopen() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tail *Tail) readLine() (string, error) {
|
func (tail *Tail) readLine() (string, error) {
|
||||||
|
tail.lk.Lock()
|
||||||
line, err := tail.reader.ReadString('\n')
|
line, err := tail.reader.ReadString('\n')
|
||||||
|
tail.lk.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Note ReadString "returns the data read before the error" in
|
// Note ReadString "returns the data read before the error" in
|
||||||
// case of an error, including EOF, so we return it as is. The
|
// case of an error, including EOF, so we return it as is. The
|
||||||
|
@ -315,7 +322,10 @@ func (tail *Tail) waitForChanges() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, pos)
|
tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -55,14 +55,13 @@ func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges {
|
func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
|
||||||
changes := NewFileChanges()
|
|
||||||
|
|
||||||
err := Watch(fw.Filename)
|
err := Watch(fw.Filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
go changes.NotifyDeleted()
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
changes := NewFileChanges()
|
||||||
fw.Size = pos
|
fw.Size = pos
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -116,5 +115,5 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return changes
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,12 @@
|
||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hpcloud/tail/util"
|
|
||||||
"gopkg.in/tomb.v1"
|
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hpcloud/tail/util"
|
||||||
|
"gopkg.in/tomb.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PollingFileWatcher polls the file for changes.
|
// PollingFileWatcher polls the file for changes.
|
||||||
|
@ -40,7 +41,12 @@ func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges {
|
func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
|
||||||
|
origFi, err := os.Stat(fw.Filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
changes := NewFileChanges()
|
changes := NewFileChanges()
|
||||||
var prevModTime time.Time
|
var prevModTime time.Time
|
||||||
|
|
||||||
|
@ -48,11 +54,6 @@ func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges
|
||||||
// the fatal (below) with tomb's Kill.
|
// the fatal (below) with tomb's Kill.
|
||||||
|
|
||||||
fw.Size = pos
|
fw.Size = pos
|
||||||
origFi, err := os.Stat(fw.Filename)
|
|
||||||
if err != nil {
|
|
||||||
changes.NotifyDeleted()
|
|
||||||
return changes
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer changes.Close()
|
defer changes.Close()
|
||||||
|
@ -110,7 +111,7 @@ func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) *FileChanges
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return changes
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -15,5 +15,5 @@ type FileWatcher interface {
|
||||||
// or truncation event.
|
// or truncation event.
|
||||||
// In order to properly report truncations, ChangeEvents requires
|
// In order to properly report truncations, ChangeEvents requires
|
||||||
// the caller to pass their current offset in the file.
|
// the caller to pass their current offset in the file.
|
||||||
ChangeEvents(*tomb.Tomb, int64) *FileChanges
|
ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue