This commit is contained in:
parent
b61c451ca7
commit
5b2dcdfce2
|
@ -1,2 +1,8 @@
|
|||
<<<<<<< HEAD
|
||||
# diskfifo
|
||||
|
||||
=======
|
||||
### 简介
|
||||
|
||||
一个简易的,数据持久化的FIFO队列
|
||||
>>>>>>> 0390b96 (init)
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
package diskfifo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type writer struct {
|
||||
filename string
|
||||
pos int64
|
||||
}
|
||||
|
||||
func (w *writer) write(b []byte) (err error) {
|
||||
f, err := os.OpenFile(w.filename, os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
size := len(b)
|
||||
if err = binary.Write(buf, binary.BigEndian, int32(size)); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = buf.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = f.Write(buf.Bytes()); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
w.pos = w.pos + int64(size) + 4
|
||||
return
|
||||
}
|
||||
|
||||
type reader struct {
|
||||
filename string
|
||||
pos int64
|
||||
}
|
||||
|
||||
func (r *reader) read() (b []byte, err error) {
|
||||
f, err := os.OpenFile(r.filename, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var size int32
|
||||
if err = binary.Read(f, binary.BigEndian, &size); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
b = make([]byte, size)
|
||||
if _, err = io.ReadFull(f, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.pos = r.pos + int64(size) + 4
|
||||
return
|
||||
}
|
||||
|
||||
type metadata struct {
|
||||
WPos int64
|
||||
RPos int64
|
||||
}
|
||||
|
||||
func getMetadata(dir string, name string) (*metadata, error) {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
|
||||
if _, err := os.Stat(filename); err != nil {
|
||||
return &metadata{WPos: 0, RPos: 0}, nil
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(filename, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var meta metadata
|
||||
if err := json.NewDecoder(f).Decode(&meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
func saveMetadata(dir string, name string, meta *metadata) (err error) {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name))
|
||||
f, err := os.OpenFile(filename, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return json.NewEncoder(f).Encode(&meta)
|
||||
}
|
||||
|
||||
// 先入先出队列
|
||||
type FIFOQueue struct {
|
||||
dir string
|
||||
name string
|
||||
|
||||
writer *writer
|
||||
reader *reader
|
||||
|
||||
lock *sync.Mutex
|
||||
}
|
||||
|
||||
// 新建一个FIFOQueue
|
||||
func New(dir string, name string) (*FIFOQueue, error) {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%s.dat", name))
|
||||
if _, err := os.Stat(filename); err != nil {
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
meta, err := getMetadata(dir, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &FIFOQueue{
|
||||
dir: dir,
|
||||
name: name,
|
||||
writer: &writer{filename: filename, pos: meta.WPos},
|
||||
reader: &reader{filename: filename, pos: meta.RPos},
|
||||
lock: &sync.Mutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// 队列里的数据已经被全部读取完
|
||||
var ErrNotFoundMessage = errors.New("not found message")
|
||||
|
||||
// 从队列读取一条数据
|
||||
func (q *FIFOQueue) Pop() (b []byte, err error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if q.writer.pos == q.reader.pos {
|
||||
return nil, ErrNotFoundMessage
|
||||
}
|
||||
|
||||
b, err = q.reader.read()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// 向队列里写入一条数据
|
||||
func (q *FIFOQueue) Push(b []byte) (err error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if err = q.writer.write(b); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos})
|
||||
return nil
|
||||
}
|
||||
|
||||
// 阻塞方式读取数据,如果队列里没有数据,则等待数据
|
||||
func (q *FIFOQueue) BPop() (b []byte, err error) {
|
||||
for {
|
||||
b, err = q.Pop()
|
||||
if err != nil {
|
||||
if err != ErrNotFoundMessage {
|
||||
return nil, err
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
continue
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue