Merge branch 'vsco-fix-log-reopen-with-inotify'

This commit is contained in:
Nino Kodabande 2017-07-07 12:43:10 -07:00
commit a927b6857f
2 changed files with 79 additions and 50 deletions

View File

@ -343,13 +343,21 @@ func reOpen(t *testing.T, poll bool) {
"test.txt", "test.txt",
Config{Follow: true, ReOpen: true, Poll: poll}) Config{Follow: true, ReOpen: true, Poll: poll})
content := []string{"hello", "world", "more", "data", "endofworld"} content := []string{"hello", "world", "more", "data", "endofworld"}
go tailTest.ReadLines(tail, content) go tailTest.VerifyTailOutput(tail, content, false)
if poll {
// deletion must trigger reopen // deletion must trigger reopen
<-time.After(delay) <-time.After(delay)
tailTest.RemoveFile("test.txt") tailTest.RemoveFile("test.txt")
<-time.After(delay) <-time.After(delay)
tailTest.CreateFile("test.txt", "more\ndata\n") tailTest.CreateFile("test.txt", "more\ndata\n")
} else {
// In inotify mode, fsnotify is currently unable to deliver notifications
// about deletion of open files, so we are not testing file deletion.
// (see https://github.com/fsnotify/fsnotify/issues/194 for details).
<-time.After(delay)
tailTest.AppendToFile("test.txt", "more\ndata\n")
}
// rename must trigger reopen // rename must trigger reopen
<-time.After(delay) <-time.After(delay)
@ -366,7 +374,34 @@ func reOpen(t *testing.T, poll bool) {
// Do not bother with stopping as it could kill the tomb during // Do not bother with stopping as it could kill the tomb during
// the reading of data written above. Timings can vary based on // the reading of data written above. Timings can vary based on
// test environment. // test environment.
tail.Cleanup() tailTest.Cleanup(tail, false)
}
func TestInotify_WaitForCreateThenMove(t *testing.T) {
tailTest := NewTailTest("wait-for-create-then-reopen", t)
os.Remove(tailTest.path + "/test.txt") // Make sure the file does NOT exist.
tail := tailTest.StartTail(
"test.txt",
Config{Follow: true, ReOpen: true, Poll: false})
content := []string{"hello", "world", "endofworld"}
go tailTest.VerifyTailOutput(tail, content, false)
time.Sleep(50 * time.Millisecond)
tailTest.CreateFile("test.txt", "hello\nworld\n")
time.Sleep(50 * time.Millisecond)
tailTest.RenameFile("test.txt", "test.txt.rotated")
time.Sleep(50 * time.Millisecond)
tailTest.CreateFile("test.txt", "endofworld\n")
time.Sleep(50 * time.Millisecond)
tailTest.RemoveFile("test.txt.rotated")
tailTest.RemoveFile("test.txt")
// Do not bother with stopping as it could kill the tomb during
// the reading of data written above. Timings can vary based on
// test environment.
tailTest.Cleanup(tail, false)
} }
func reSeek(t *testing.T, poll bool) { func reSeek(t *testing.T, poll bool) {
@ -426,6 +461,13 @@ func (t TailTest) CreateFile(name string, contents string) {
} }
} }
func (t TailTest) AppendToFile(name string, contents string) {
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600|os.ModeAppend)
if err != nil {
t.Fatal(err)
}
}
func (t TailTest) RemoveFile(name string) { func (t TailTest) RemoveFile(name string) {
err := os.Remove(t.path + "/" + name) err := os.Remove(t.path + "/" + name)
if err != nil { if err != nil {

View File

@ -108,28 +108,10 @@ func remove(winfo *watchInfo) error {
delete(shared.done, winfo.fname) delete(shared.done, winfo.fname)
close(done) close(done)
} }
fname := winfo.fname
if winfo.isCreate() {
// Watch for new files to be created in the parent directory.
fname = filepath.Dir(fname)
}
shared.watchNums[fname]--
watchNum := shared.watchNums[fname]
if watchNum == 0 {
delete(shared.watchNums, fname)
}
shared.mux.Unlock() shared.mux.Unlock()
// If we were the last ones to watch this file, unsubscribe from inotify.
// This needs to happen after releasing the lock because fsnotify waits
// synchronously for the kernel to acknowledge the removal of the watch
// for this file, which causes us to deadlock if we still held the lock.
if watchNum == 0 {
return shared.watcher.Remove(fname)
}
shared.remove <- winfo shared.remove <- winfo
return nil return <-shared.error
} }
// Events returns a channel to which FileEvents corresponding to the input filename // Events returns a channel to which FileEvents corresponding to the input filename
@ -155,6 +137,8 @@ func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
if shared.chans[winfo.fname] == nil { if shared.chans[winfo.fname] == nil {
shared.chans[winfo.fname] = make(chan fsnotify.Event) shared.chans[winfo.fname] = make(chan fsnotify.Event)
}
if shared.done[winfo.fname] == nil {
shared.done[winfo.fname] = make(chan bool) shared.done[winfo.fname] = make(chan bool)
} }
@ -164,47 +148,50 @@ func (shared *InotifyTracker) addWatch(winfo *watchInfo) error {
fname = filepath.Dir(fname) fname = filepath.Dir(fname)
} }
var err error
// already in inotify watch // already in inotify watch
if shared.watchNums[fname] > 0 { if shared.watchNums[fname] == 0 {
shared.watchNums[fname]++ err = shared.watcher.Add(fname)
if winfo.isCreate() {
shared.watchNums[winfo.fname]++
} }
return nil
}
err := shared.watcher.Add(fname)
if err == nil { if err == nil {
shared.watchNums[fname]++ shared.watchNums[fname]++
if winfo.isCreate() {
shared.watchNums[winfo.fname]++
}
} }
return err return err
} }
// removeWatch calls fsnotify.RemoveWatch for the input filename and closes the // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the
// corresponding events channel. // corresponding events channel.
func (shared *InotifyTracker) removeWatch(winfo *watchInfo) { func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error {
shared.mux.Lock() shared.mux.Lock()
defer shared.mux.Unlock()
ch := shared.chans[winfo.fname] ch := shared.chans[winfo.fname]
if ch == nil { if ch != nil {
return
}
delete(shared.chans, winfo.fname) delete(shared.chans, winfo.fname)
close(ch) close(ch)
if !winfo.isCreate() {
return
} }
shared.watchNums[winfo.fname]-- fname := winfo.fname
if shared.watchNums[winfo.fname] == 0 { if winfo.isCreate() {
delete(shared.watchNums, winfo.fname) // Watch for new files to be created in the parent directory.
fname = filepath.Dir(fname)
} }
shared.watchNums[fname]--
watchNum := shared.watchNums[fname]
if watchNum == 0 {
delete(shared.watchNums, fname)
}
shared.mux.Unlock()
var err error
// If we were the last ones to watch this file, unsubscribe from inotify.
// This needs to happen after releasing the lock because fsnotify waits
// synchronously for the kernel to acknowledge the removal of the watch
// for this file, which causes us to deadlock if we still held the lock.
if watchNum == 0 {
err = shared.watcher.Remove(fname)
}
return err
} }
// sendEvent sends the input event to the appropriate Tail. // sendEvent sends the input event to the appropriate Tail.
@ -239,7 +226,7 @@ func (shared *InotifyTracker) run() {
shared.error <- shared.addWatch(winfo) shared.error <- shared.addWatch(winfo)
case winfo := <-shared.remove: case winfo := <-shared.remove:
shared.removeWatch(winfo) shared.error <- shared.removeWatch(winfo)
case event, open := <-shared.watcher.Events: case event, open := <-shared.watcher.Events:
if !open { if !open {