From fdd2309e9802785b867d49ad0792d662831d2281 Mon Sep 17 00:00:00 2001 From: xiezhenye Date: Wed, 13 Feb 2019 16:28:21 +0800 Subject: [PATCH 1/2] add peek --- diskqueue.go | 11 ++++++++++- diskqueue_test.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index fb273a9..8875d81 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -52,6 +52,7 @@ type Interface interface { Delete() error Depth() int64 Empty() error + Peek() []byte } // diskQueue implements a filesystem backed FIFO queue @@ -90,6 +91,7 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + peekChan chan []byte // internal channels writeChan chan []byte @@ -114,6 +116,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + peekChan: make(chan []byte), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), @@ -145,6 +148,10 @@ func (d *diskQueue) ReadChan() chan []byte { return d.readChan } +func (d *diskQueue) Peek() []byte { + return <-d.peekChan +} + // Put writes a []byte to the queue func (d *diskQueue) Put(data []byte) error { d.RLock() @@ -535,7 +542,6 @@ func (d *diskQueue) moveForward() { d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos depth := atomic.AddInt64(&d.depth, -1) - // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { // sync every time we start reading from a new file @@ -629,6 +635,7 @@ func (d *diskQueue) ioLoop() { } r = d.readChan } else { + dataRead = nil r = nil } @@ -639,6 +646,8 @@ func (d *diskQueue) ioLoop() { count++ // moveForward sets needSync flag if a file is removed d.moveForward() + case d.peekChan <- dataRead: + // peek case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() count = 0 diff --git a/diskqueue_test.go b/diskqueue_test.go index 2e8e283..2975301 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -91,13 +91,39 @@ func TestDiskQueue(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) + msgOut := dq.Peek() + Nil(t, msgOut) + msg := []byte("test") + msg1 := []byte("test1") + err = dq.Put(msg) Nil(t, err) Equal(t, int64(1), dq.Depth()) - msgOut := <-dq.ReadChan() + msgOut = dq.Peek() + Equal(t, msg, msgOut) + msgOut = dq.Peek() + Equal(t, msg, msgOut) + + err = dq.Put(msg1) + Nil(t, err) + Equal(t, int64(2), dq.Depth()) + + msgOut = dq.Peek() + Equal(t, msg, msgOut) + + msgOut = <-dq.ReadChan() Equal(t, msg, msgOut) + + msgOut = dq.Peek() + Equal(t, msg1, msgOut) + + msgOut = <-dq.ReadChan() + Equal(t, msg1, msgOut) + + msgOut = dq.Peek() + Nil(t, msgOut) } func TestDiskQueueRoll(t *testing.T) { @@ -165,6 +191,8 @@ func TestDiskQueueEmpty(t *testing.T) { numFiles := dq.(*diskQueue).writeFileNum dq.Empty() + outMsg := dq.Peek() + Nil(t, outMsg) assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) for i := int64(0); i <= numFiles; i++ { From f4a0924f28a8e54c34812b4dcdb6580abdfa514c Mon Sep 17 00:00:00 2001 From: Xie Zhenye Date: Wed, 3 Jun 2020 17:23:04 +0800 Subject: [PATCH 2/2] fix tests --- diskqueue_test.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 84a21ab..4a61380 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -91,8 +91,9 @@ func TestDiskQueue(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - msgOut := dq.Peek() + msgOut, err := dq.Peek() Nil(t, msgOut) + Nil(t, err) msg := []byte("test") msg1 := []byte("test1") @@ -101,29 +102,35 @@ func TestDiskQueue(t *testing.T) { Nil(t, err) Equal(t, int64(1), dq.Depth()) - msgOut = dq.Peek() + msgOut, err = dq.Peek() Equal(t, msg, msgOut) - msgOut = dq.Peek() + Nil(t, err) + + msgOut, err = dq.Peek() Equal(t, msg, msgOut) + Nil(t, err) err = dq.Put(msg1) Nil(t, err) Equal(t, int64(2), dq.Depth()) - msgOut = dq.Peek() + msgOut, err = dq.Peek() Equal(t, msg, msgOut) + Nil(t, err) msgOut = <-dq.ReadChan() Equal(t, msg, msgOut) - msgOut = dq.Peek() + msgOut, err = dq.Peek() Equal(t, msg1, msgOut) + Nil(t, err) msgOut = <-dq.ReadChan() Equal(t, msg1, msgOut) - msgOut = dq.Peek() + msgOut, err = dq.Peek() Nil(t, msgOut) + Nil(t, err) } func TestDiskQueueRoll(t *testing.T) { @@ -196,8 +203,9 @@ func TestDiskQueueEmpty(t *testing.T) { numFiles := dq.(*diskQueue).writeFileNum dq.Empty() - outMsg := dq.Peek() + outMsg, err := dq.Peek() Nil(t, outMsg) + Nil(t, err) assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) for i := int64(0); i <= numFiles; i++ {