Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (l LogLevel) String() string {
type Interface interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Expand Down Expand Up @@ -91,6 +92,9 @@ type diskQueue struct {
// exposed via ReadChan()
readChan chan []byte

// exposed via PeekChan()
peekChan chan []byte

// internal channels
depthChan chan int64
writeChan chan []byte
Expand All @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
peekChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte {
return d.readChan
}

func (d *diskQueue) PeekChan() <-chan []byte {
return d.peekChan
}

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
d.RLock()
Expand Down Expand Up @@ -648,6 +657,7 @@ func (d *diskQueue) ioLoop() {
var err error
var count int64
var r chan []byte
var p chan []byte

syncTicker := time.NewTicker(d.syncTimeout)

Expand Down Expand Up @@ -676,13 +686,16 @@ func (d *diskQueue) ioLoop() {
}
}
r = d.readChan
p = d.peekChan
} else {
r = nil
p = nil
}

select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case p <- dataRead:
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
Expand Down
77 changes: 77 additions & 0 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,83 @@ func TestDiskQueueRoll(t *testing.T) {
}
}

func TestDiskQueuePeek(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

t.Run("roll", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("peek-read", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("read-peek", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 1; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

}

func assertFileNotExist(t *testing.T, fn string) {
f, err := os.OpenFile(fn, os.O_RDONLY, 0600)
Equal(t, (*os.File)(nil), f)
Expand Down