diskfifo/fifoqueue.go

193 lines
3.6 KiB
Go
Raw Permalink Normal View History

2024-03-21 19:49:34 +08:00
package diskfifo
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
)
type writer struct {
filename string
pos int64
}
func (w *writer) write(b []byte) (err error) {
f, err := os.OpenFile(w.filename, os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
buf := &bytes.Buffer{}
size := len(b)
if err = binary.Write(buf, binary.BigEndian, int32(size)); err != nil {
return
}
if _, err = buf.Write(b); err != nil {
return err
}
if _, err = f.Write(buf.Bytes()); err != nil {
return
}
w.pos = w.pos + int64(size) + 4
return
}
type reader struct {
filename string
pos int64
}
func (r *reader) read() (b []byte, err error) {
f, err := os.OpenFile(r.filename, os.O_RDONLY, 0644)
if err != nil {
return
}
defer f.Close()
var size int32
if err = binary.Read(f, binary.BigEndian, &size); err != nil {
return
}
b = make([]byte, size)
if _, err = io.ReadFull(f, b); err != nil {
return nil, err
}
r.pos = r.pos + int64(size) + 4
return
}
type metadata struct {
WPos int64
RPos int64
}
func getMetadata(dir string, name string) (*metadata, error) {
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
if _, err := os.Stat(filename); err != nil {
return &metadata{WPos: 0, RPos: 0}, nil
}
f, err := os.OpenFile(filename, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
defer f.Close()
var meta metadata
if err := json.NewDecoder(f).Decode(&meta); err != nil {
return nil, err
}
return &meta, nil
}
func saveMetadata(dir string, name string, meta *metadata) (err error) {
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
f, err := os.OpenFile(filename, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
return json.NewEncoder(f).Encode(&meta)
}
// 先入先出队列
type FIFOQueue struct {
dir string
name string
writer *writer
reader *reader
lock *sync.Mutex
}
// 新建一个FIFOQueue
func New(dir string, name string) (*FIFOQueue, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
filename := filepath.Join(dir, fmt.Sprintf("%s.dat", name))
if _, err := os.Stat(filename); err != nil {
f, err := os.Create(filename)
if err != nil {
return nil, err
}
f.Close()
}
meta, err := getMetadata(dir, name)
if err != nil {
return nil, err
}
return &FIFOQueue{
dir: dir,
name: name,
writer: &writer{filename: filename, pos: meta.WPos},
reader: &reader{filename: filename, pos: meta.RPos},
lock: &sync.Mutex{},
}, nil
}
// 队列里的数据已经被全部读取完
var ErrNotFoundMessage = errors.New("not found message")
// 从队列读取一条数据
func (q *FIFOQueue) Pop() (b []byte, err error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.writer.pos == q.reader.pos {
return nil, ErrNotFoundMessage
}
b, err = q.reader.read()
if err != nil {
return nil, err
}
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
return b, nil
}
// 向队列里写入一条数据
func (q *FIFOQueue) Push(b []byte) (err error) {
q.lock.Lock()
defer q.lock.Unlock()
if err = q.writer.write(b); err != nil {
return
}
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
return nil
}
// 阻塞方式读取数据,如果队列里没有数据,则等待数据
func (q *FIFOQueue) BPop() (b []byte, err error) {
for {
b, err = q.Pop()
if err != nil {
if err != ErrNotFoundMessage {
return nil, err
}
time.Sleep(3 * time.Second)
continue
}
return b, nil
}
}