Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Interface interface {
Delete() error
Depth() int64
Empty() error
Peek() ([]byte, error)
}

// diskQueue implements a filesystem backed FIFO queue
Expand Down Expand Up @@ -89,6 +90,7 @@ type diskQueue struct {

// exposed via ReadChan()
readChan chan []byte
peekChan chan []byte

// internal channels
depthChan chan int64
Expand All @@ -114,6 +116,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
peekChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -151,6 +154,14 @@ func (d *diskQueue) ReadChan() <-chan []byte {
return d.readChan
}

func (d *diskQueue) Peek() ([]byte, error) {
ret, ok := <-d.peekChan
if !ok {
return nil, errors.New("exiting")
}
return ret, nil
}

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
d.RLock()
Expand Down Expand Up @@ -194,6 +205,7 @@ func (d *diskQueue) exit(deleted bool) error {
<-d.exitSyncChan

close(d.depthChan)
close(d.peekChan)

if d.readFile != nil {
d.readFile.Close()
Expand Down Expand Up @@ -543,7 +555,6 @@ func (d *diskQueue) moveForward() {
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
d.depth -= 1

// see if we need to clean up the old file
if oldReadFileNum != d.nextReadFileNum {
// sync every time we start reading from a new file
Expand Down Expand Up @@ -637,6 +648,7 @@ func (d *diskQueue) ioLoop() {
}
r = d.readChan
} else {
dataRead = nil
r = nil
}

Expand All @@ -647,6 +659,7 @@ func (d *diskQueue) ioLoop() {
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case d.peekChan <- dataRead:
case d.depthChan <- d.depth:
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
Expand Down
38 changes: 37 additions & 1 deletion diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,46 @@ func TestDiskQueue(t *testing.T) {
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

msgOut, err := dq.Peek()
Nil(t, msgOut)
Nil(t, err)

msg := []byte("test")
msg1 := []byte("test1")

err = dq.Put(msg)
Nil(t, err)
Equal(t, int64(1), dq.Depth())

msgOut := <-dq.ReadChan()
msgOut, err = dq.Peek()
Equal(t, msg, msgOut)
Nil(t, err)

msgOut, err = dq.Peek()
Equal(t, msg, msgOut)
Nil(t, err)

err = dq.Put(msg1)
Nil(t, err)
Equal(t, int64(2), dq.Depth())

msgOut, err = dq.Peek()
Equal(t, msg, msgOut)
Nil(t, err)

msgOut = <-dq.ReadChan()
Equal(t, msg, msgOut)

msgOut, err = dq.Peek()
Equal(t, msg1, msgOut)
Nil(t, err)

msgOut = <-dq.ReadChan()
Equal(t, msg1, msgOut)

msgOut, err = dq.Peek()
Nil(t, msgOut)
Nil(t, err)
}

func TestDiskQueueRoll(t *testing.T) {
Expand Down Expand Up @@ -170,6 +203,9 @@ func TestDiskQueueEmpty(t *testing.T) {

numFiles := dq.(*diskQueue).writeFileNum
dq.Empty()
outMsg, err := dq.Peek()
Nil(t, outMsg)
Nil(t, err)

assertFileNotExist(t, dq.(*diskQueue).metaDataFileName())
for i := int64(0); i <= numFiles; i++ {
Expand Down