diff --git a/diskqueue.go b/diskqueue.go index 29d15ac..7ab63dc 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -51,6 +51,7 @@ type Interface interface { Delete() error Depth() int64 Empty() error + Peek() ([]byte, error) } // diskQueue implements a filesystem backed FIFO queue @@ -89,6 +90,7 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + peekChan chan []byte // internal channels depthChan chan int64 @@ -114,6 +116,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + peekChan: make(chan []byte), depthChan: make(chan int64), writeChan: make(chan []byte), writeResponseChan: make(chan error), @@ -151,6 +154,14 @@ func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan } +func (d *diskQueue) Peek() ([]byte, error) { + ret, ok := <-d.peekChan + if !ok { + return nil, errors.New("exiting") + } + return ret, nil +} + // Put writes a []byte to the queue func (d *diskQueue) Put(data []byte) error { d.RLock() @@ -194,6 +205,7 @@ func (d *diskQueue) exit(deleted bool) error { <-d.exitSyncChan close(d.depthChan) + close(d.peekChan) if d.readFile != nil { d.readFile.Close() @@ -543,7 +555,6 @@ func (d *diskQueue) moveForward() { d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 - // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { // sync every time we start reading from a new file @@ -637,6 +648,7 @@ func (d *diskQueue) ioLoop() { } r = d.readChan } else { + dataRead = nil r = nil } @@ -647,6 +659,7 @@ func (d *diskQueue) ioLoop() { count++ // moveForward sets needSync flag if a file is removed d.moveForward() + case d.peekChan <- dataRead: case d.depthChan <- d.depth: case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() diff --git a/diskqueue_test.go b/diskqueue_test.go index a685f07..4a61380 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -91,13 +91,46 @@ func TestDiskQueue(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) + msgOut, err := dq.Peek() + Nil(t, msgOut) + Nil(t, err) + msg := []byte("test") + msg1 := []byte("test1") + err = dq.Put(msg) Nil(t, err) Equal(t, int64(1), dq.Depth()) - msgOut := <-dq.ReadChan() + msgOut, err = dq.Peek() + Equal(t, msg, msgOut) + Nil(t, err) + + msgOut, err = dq.Peek() + Equal(t, msg, msgOut) + Nil(t, err) + + err = dq.Put(msg1) + Nil(t, err) + Equal(t, int64(2), dq.Depth()) + + msgOut, err = dq.Peek() Equal(t, msg, msgOut) + Nil(t, err) + + msgOut = <-dq.ReadChan() + Equal(t, msg, msgOut) + + msgOut, err = dq.Peek() + Equal(t, msg1, msgOut) + Nil(t, err) + + msgOut = <-dq.ReadChan() + Equal(t, msg1, msgOut) + + msgOut, err = dq.Peek() + Nil(t, msgOut) + Nil(t, err) } func TestDiskQueueRoll(t *testing.T) { @@ -170,6 +203,9 @@ func TestDiskQueueEmpty(t *testing.T) { numFiles := dq.(*diskQueue).writeFileNum dq.Empty() + outMsg, err := dq.Peek() + Nil(t, outMsg) + Nil(t, err) assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) for i := int64(0); i <= numFiles; i++ {