193 lines
3.6 KiB
Go
193 lines
3.6 KiB
Go
package diskfifo
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type writer struct {
|
|
filename string
|
|
pos int64
|
|
}
|
|
|
|
func (w *writer) write(b []byte) (err error) {
|
|
f, err := os.OpenFile(w.filename, os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
buf := &bytes.Buffer{}
|
|
size := len(b)
|
|
if err = binary.Write(buf, binary.BigEndian, int32(size)); err != nil {
|
|
return
|
|
}
|
|
if _, err = buf.Write(b); err != nil {
|
|
return err
|
|
}
|
|
if _, err = f.Write(buf.Bytes()); err != nil {
|
|
return
|
|
}
|
|
|
|
w.pos = w.pos + int64(size) + 4
|
|
return
|
|
}
|
|
|
|
type reader struct {
|
|
filename string
|
|
pos int64
|
|
}
|
|
|
|
func (r *reader) read() (b []byte, err error) {
|
|
f, err := os.OpenFile(r.filename, os.O_RDONLY, 0644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
var size int32
|
|
if err = binary.Read(f, binary.BigEndian, &size); err != nil {
|
|
return
|
|
}
|
|
|
|
b = make([]byte, size)
|
|
if _, err = io.ReadFull(f, b); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.pos = r.pos + int64(size) + 4
|
|
return
|
|
}
|
|
|
|
type metadata struct {
|
|
WPos int64
|
|
RPos int64
|
|
}
|
|
|
|
func getMetadata(dir string, name string) (*metadata, error) {
|
|
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
|
|
if _, err := os.Stat(filename); err != nil {
|
|
return &metadata{WPos: 0, RPos: 0}, nil
|
|
}
|
|
|
|
f, err := os.OpenFile(filename, os.O_RDONLY, 0644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
|
|
var meta metadata
|
|
if err := json.NewDecoder(f).Decode(&meta); err != nil {
|
|
return nil, err
|
|
}
|
|
return &meta, nil
|
|
}
|
|
|
|
func saveMetadata(dir string, name string, meta *metadata) (err error) {
|
|
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
|
|
f, err := os.OpenFile(filename, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
return json.NewEncoder(f).Encode(&meta)
|
|
}
|
|
|
|
// 先入先出队列
|
|
type FIFOQueue struct {
|
|
dir string
|
|
name string
|
|
|
|
writer *writer
|
|
reader *reader
|
|
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
// 新建一个FIFOQueue
|
|
func New(dir string, name string) (*FIFOQueue, error) {
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, err
|
|
}
|
|
filename := filepath.Join(dir, fmt.Sprintf("%s.dat", name))
|
|
if _, err := os.Stat(filename); err != nil {
|
|
f, err := os.Create(filename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f.Close()
|
|
}
|
|
|
|
meta, err := getMetadata(dir, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &FIFOQueue{
|
|
dir: dir,
|
|
name: name,
|
|
writer: &writer{filename: filename, pos: meta.WPos},
|
|
reader: &reader{filename: filename, pos: meta.RPos},
|
|
lock: &sync.Mutex{},
|
|
}, nil
|
|
}
|
|
|
|
// 队列里的数据已经被全部读取完
|
|
var ErrNotFoundMessage = errors.New("not found message")
|
|
|
|
// 从队列读取一条数据
|
|
func (q *FIFOQueue) Pop() (b []byte, err error) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
if q.writer.pos == q.reader.pos {
|
|
return nil, ErrNotFoundMessage
|
|
}
|
|
|
|
b, err = q.reader.read()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
|
|
|
|
return b, nil
|
|
}
|
|
|
|
// 向队列里写入一条数据
|
|
func (q *FIFOQueue) Push(b []byte) (err error) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
if err = q.writer.write(b); err != nil {
|
|
return
|
|
}
|
|
|
|
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
|
|
return nil
|
|
}
|
|
|
|
// 阻塞方式读取数据,如果队列里没有数据,则等待数据
|
|
func (q *FIFOQueue) BPop() (b []byte, err error) {
|
|
for {
|
|
b, err = q.Pop()
|
|
if err != nil {
|
|
if err != ErrNotFoundMessage {
|
|
return nil, err
|
|
}
|
|
time.Sleep(3 * time.Second)
|
|
continue
|
|
}
|
|
return b, nil
|
|
}
|
|
}
|