diff --git a/README.md b/README.md index 697d364..2a50dbe 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ +<<<<<<< HEAD # diskfifo +======= +### 简介 + +一个简易的,数据持久化的FIFO队列 +>>>>>>> 0390b96 (init) diff --git a/fifoqueue.go b/fifoqueue.go new file mode 100644 index 0000000..f945916 --- /dev/null +++ b/fifoqueue.go @@ -0,0 +1,192 @@ +package diskfifo + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" +) + +type writer struct { + filename string + pos int64 +} + +func (w *writer) write(b []byte) (err error) { + f, err := os.OpenFile(w.filename, os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + buf := &bytes.Buffer{} + size := len(b) + if err = binary.Write(buf, binary.BigEndian, int32(size)); err != nil { + return + } + if _, err = buf.Write(b); err != nil { + return err + } + if _, err = f.Write(buf.Bytes()); err != nil { + return + } + + w.pos = w.pos + int64(size) + 4 + return +} + +type reader struct { + filename string + pos int64 +} + +func (r *reader) read() (b []byte, err error) { + f, err := os.OpenFile(r.filename, os.O_RDONLY, 0644) + if err != nil { + return + } + defer f.Close() + + var size int32 + if err = binary.Read(f, binary.BigEndian, &size); err != nil { + return + } + + b = make([]byte, size) + if _, err = io.ReadFull(f, b); err != nil { + return nil, err + } + + r.pos = r.pos + int64(size) + 4 + return +} + +type metadata struct { + WPos int64 + RPos int64 +} + +func getMetadata(dir string, name string) (*metadata, error) { + filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name)) + if _, err := os.Stat(filename); err != nil { + return &metadata{WPos: 0, RPos: 0}, nil + } + + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + defer f.Close() + + var meta metadata + if err := json.NewDecoder(f).Decode(&meta); err != nil { + return nil, err + } + return &meta, nil +} + +func saveMetadata(dir string, name string, meta *metadata) (err error) { + filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name)) + f, err := os.OpenFile(filename, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + + return json.NewEncoder(f).Encode(&meta) +} + +// 先入先出队列 +type FIFOQueue struct { + dir string + name string + + writer *writer + reader *reader + + lock *sync.Mutex +} + +// 新建一个FIFOQueue +func New(dir string, name string) (*FIFOQueue, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + filename := filepath.Join(dir, fmt.Sprintf("%s.dat", name)) + if _, err := os.Stat(filename); err != nil { + f, err := os.Create(filename) + if err != nil { + return nil, err + } + f.Close() + } + + meta, err := getMetadata(dir, name) + if err != nil { + return nil, err + } + + return &FIFOQueue{ + dir: dir, + name: name, + writer: &writer{filename: filename, pos: meta.WPos}, + reader: &reader{filename: filename, pos: meta.RPos}, + lock: &sync.Mutex{}, + }, nil +} + +// 队列里的数据已经被全部读取完 +var ErrNotFoundMessage = errors.New("not found message") + +// 从队列读取一条数据 +func (q *FIFOQueue) Pop() (b []byte, err error) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.writer.pos == q.reader.pos { + return nil, ErrNotFoundMessage + } + + b, err = q.reader.read() + if err != nil { + return nil, err + } + + saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos}) + + return b, nil +} + +// 向队列里写入一条数据 +func (q *FIFOQueue) Push(b []byte) (err error) { + q.lock.Lock() + defer q.lock.Unlock() + + if err = q.writer.write(b); err != nil { + return + } + + saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos}) + return nil +} + +// 阻塞方式读取数据,如果队列里没有数据,则等待数据 +func (q *FIFOQueue) BPop() (b []byte, err error) { + for { + b, err = q.Pop() + if err != nil { + if err != ErrNotFoundMessage { + return nil, err + } + time.Sleep(3 * time.Second) + continue + } + return b, nil + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ff684a0 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.lovezsh.com/go-kit/diskfifo + +go 1.21.3