package diskfifo import ( "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "sync" "time" ) type writer struct { filename string pos int64 } func (w *writer) write(b []byte) (err error) { f, err := os.OpenFile(w.filename, os.O_WRONLY, 0644) if err != nil { return err } defer f.Close() buf := &bytes.Buffer{} size := len(b) if err = binary.Write(buf, binary.BigEndian, int32(size)); err != nil { return } if _, err = buf.Write(b); err != nil { return err } if _, err = f.Write(buf.Bytes()); err != nil { return } w.pos = w.pos + int64(size) + 4 return } type reader struct { filename string pos int64 } func (r *reader) read() (b []byte, err error) { f, err := os.OpenFile(r.filename, os.O_RDONLY, 0644) if err != nil { return } defer f.Close() var size int32 if err = binary.Read(f, binary.BigEndian, &size); err != nil { return } b = make([]byte, size) if _, err = io.ReadFull(f, b); err != nil { return nil, err } r.pos = r.pos + int64(size) + 4 return } type metadata struct { WPos int64 RPos int64 } func getMetadata(dir string, name string) (*metadata, error) { filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name)) if _, err := os.Stat(filename); err != nil { return &metadata{WPos: 0, RPos: 0}, nil } f, err := os.OpenFile(filename, os.O_RDONLY, 0644) if err != nil { return nil, err } defer f.Close() var meta metadata if err := json.NewDecoder(f).Decode(&meta); err != nil { return nil, err } return &meta, nil } func saveMetadata(dir string, name string, meta *metadata) (err error) { filename := filepath.Join(dir, fmt.Sprintf("%s.meta", name)) f, err := os.OpenFile(filename, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return } defer f.Close() return json.NewEncoder(f).Encode(&meta) } // 先入先出队列 type FIFOQueue struct { dir string name string writer *writer reader *reader lock *sync.Mutex } // 新建一个FIFOQueue func New(dir string, name string) (*FIFOQueue, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } filename := filepath.Join(dir, fmt.Sprintf("%s.dat", name)) if _, err := os.Stat(filename); err != nil { f, err := os.Create(filename) if err != nil { return nil, err } f.Close() } meta, err := getMetadata(dir, name) if err != nil { return nil, err } return &FIFOQueue{ dir: dir, name: name, writer: &writer{filename: filename, pos: meta.WPos}, reader: &reader{filename: filename, pos: meta.RPos}, lock: &sync.Mutex{}, }, nil } // 队列里的数据已经被全部读取完 var ErrNotFoundMessage = errors.New("not found message") // 从队列读取一条数据 func (q *FIFOQueue) Pop() (b []byte, err error) { q.lock.Lock() defer q.lock.Unlock() if q.writer.pos == q.reader.pos { return nil, ErrNotFoundMessage } b, err = q.reader.read() if err != nil { return nil, err } saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos}) return b, nil } // 向队列里写入一条数据 func (q *FIFOQueue) Push(b []byte) (err error) { q.lock.Lock() defer q.lock.Unlock() if err = q.writer.write(b); err != nil { return } saveMetadata(q.dir, q.name, &metadata{WPos: q.writer.pos, RPos: q.reader.pos}) return nil } // 阻塞方式读取数据,如果队列里没有数据,则等待数据 func (q *FIFOQueue) BPop() (b []byte, err error) { for { b, err = q.Pop() if err != nil { if err != ErrNotFoundMessage { return nil, err } time.Sleep(3 * time.Second) continue } return b, nil } }