From e9b23ac5eab849aed8fc2972d06870fcbf243f09 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:03:10 +0000 Subject: [PATCH 1/6] Update variable names. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3ac27ca..042c276 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSpace int64 + maxBytesDiskSize int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - diskLimitFeatIsOn bool + enableDiskLimitation bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSpace(name, dataPath, + return NewWithDiskSize(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Space Limit feature -// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// Another constructor that allows users to use Disk Size Limit feature +// If user is not using Disk Size Limit feature, maxBytesDiskSize will // be 0 -func NewWithDiskSpace(name string, dataPath string, - maxBytesDiskSpace int64, maxBytesPerFile int64, +func NewWithDiskSize(name string, dataPath string, + maxBytesDiskSize int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - diskLimitFeatIsOn := true - if maxBytesDiskSpace <= 0 { - maxBytesDiskSpace = 0 - diskLimitFeatIsOn = false + enableDiskLimitation := true + if maxBytesDiskSize <= 0 { + maxBytesDiskSize = 0 + enableDiskLimitation = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSpace: maxBytesDiskSpace, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - diskLimitFeatIsOn: diskLimitFeatIsOn, + name: name, + dataPath: dataPath, + maxBytesDiskSize: maxBytesDiskSize, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b0ac900..b4a712a 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 7acba5f0928bf09c13aafcbb1a046caf42055851 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:05:39 +0000 Subject: [PATCH 2/6] Revert "Update variable names." This reverts commit e9b23ac5eab849aed8fc2972d06870fcbf243f09. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 042c276..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSize int64 + maxBytesDiskSpace int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - enableDiskLimitation bool + diskLimitFeatIsOn bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSize(name, dataPath, + return NewWithDiskSpace(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Size Limit feature -// If user is not using Disk Size Limit feature, maxBytesDiskSize will +// Another constructor that allows users to use Disk Space Limit feature +// If user is not using Disk Space Limit feature, maxBytesDiskSpace will // be 0 -func NewWithDiskSize(name string, dataPath string, - maxBytesDiskSize int64, maxBytesPerFile int64, +func NewWithDiskSpace(name string, dataPath string, + maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - enableDiskLimitation := true - if maxBytesDiskSize <= 0 { - maxBytesDiskSize = 0 - enableDiskLimitation = false + diskLimitFeatIsOn := true + if maxBytesDiskSpace <= 0 { + maxBytesDiskSpace = 0 + diskLimitFeatIsOn = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSize: maxBytesDiskSize, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - enableDiskLimitation: enableDiskLimitation, + name: name, + dataPath: dataPath, + maxBytesDiskSpace: maxBytesDiskSpace, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + diskLimitFeatIsOn: diskLimitFeatIsOn, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.enableDiskLimitation { + if d.diskLimitFeatIsOn { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.enableDiskLimitation { + if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk size limit feature - if d.enableDiskLimitation { + // if user is using disk space limit feature + if d.diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk size limit feature - if d.enableDiskLimitation { + // if user is using disk space limit feature + if d.diskLimitFeatIsOn { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b4a712a..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) m if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, enableDiskLimitation) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if enableDiskLimitation { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 92819e147ada3e0fb63f8768c19e8c9800cf1c27 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:08:43 +0000 Subject: [PATCH 3/6] Revert "Merge branch 'TrackDiskSize' into DepthImpl" This reverts commit 76a0ddd1719105e1d13e57988557783b9d6af9bd, reversing changes made to 30c6ef662449c70d289a402bb95cf9759c642e1e. --- diskqueue.go | 36 +++++++++++------------------------- diskqueue_test.go | 11 ++--------- 2 files changed, 13 insertions(+), 34 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3aab8e5..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -64,7 +64,6 @@ type diskQueue struct { writeFileNum int64 readMessages int64 writeMessages int64 - writeBytes int64 depth int64 sync.RWMutex @@ -108,9 +107,6 @@ type diskQueue struct { // disk limit implementation flag diskLimitFeatIsOn bool - - // the size of the - readMsgSize int32 } // New instantiates an instance of diskQueue, retrieving metadata @@ -304,7 +300,6 @@ func (d *diskQueue) skipToNextRWFile() error { d.depth = 0 d.readMessages = 0 d.writeMessages = 0 - d.writeBytes = 0 return err } @@ -313,6 +308,7 @@ func (d *diskQueue) skipToNextRWFile() error { // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error + var msgSize int32 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) @@ -349,22 +345,22 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } - err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize) + err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } - if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize { + if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize) + return nil, fmt.Errorf("invalid message read size (%d)", msgSize) } - readBuf := make([]byte, d.readMsgSize) + readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() @@ -372,7 +368,7 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, err } - totalBytes := int64(4 + d.readMsgSize) + totalBytes := int64(4 + msgSize) // we only advance next* because we have not yet sent this to consumers // (where readFileNum, readPos will actually be advanced) @@ -465,7 +461,6 @@ func (d *diskQueue) writeOne(data []byte) error { if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 - d.writeBytes += totalBytes d.writeMessages += 1 } @@ -476,12 +471,7 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 - - if d.diskLimitFeatIsOn { - // add bytes for the number of messages in the file - d.writeBytes += 8 - d.writeMessages = 0 - } + d.writeMessages = 0 // sync every time we start writing to a new file err = d.sync() @@ -532,10 +522,10 @@ func (d *diskQueue) retrieveMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &d.depth, @@ -569,10 +559,10 @@ func (d *diskQueue) persistMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) + d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -639,8 +629,6 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } func (d *diskQueue) moveForward() { - // add bytes for the number of messages and the size of the message - readFileLen := int64(d.readMsgSize) + d.readPos + 12 oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos @@ -659,8 +647,6 @@ func (d *diskQueue) moveForward() { if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) } - - d.writeBytes -= readFileLen } d.checkTailCorruption(d.depth) diff --git a/diskqueue_test.go b/diskqueue_test.go index 8d4f78f..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -251,7 +251,6 @@ func TestDiskQueueCorruption(t *testing.T) { type md struct { depth int64 - writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,12 +274,11 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { defer f.Close() var ret md - if diskLimitFeatIsOn { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, @@ -360,7 +358,6 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 0 && @@ -381,7 +378,6 @@ next: for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 1004 && @@ -409,7 +405,6 @@ completeWriteFile: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && - d.writeBytes == 3020 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readPos == 1004 && @@ -434,9 +429,7 @@ completeReadFile: // test that read position and messages reset when a file is completely read // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - t.Logf("Write bytes: %d", d.writeBytes) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && d.readPos == 0 && From 2616c7e47438cf0c881ab5eca74d4e320a384822 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:13:34 +0000 Subject: [PATCH 4/6] Update variable names. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3ac27ca..042c276 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSpace int64 + maxBytesDiskSize int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - diskLimitFeatIsOn bool + enableDiskLimitation bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSpace(name, dataPath, + return NewWithDiskSize(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Space Limit feature -// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// Another constructor that allows users to use Disk Size Limit feature +// If user is not using Disk Size Limit feature, maxBytesDiskSize will // be 0 -func NewWithDiskSpace(name string, dataPath string, - maxBytesDiskSpace int64, maxBytesPerFile int64, +func NewWithDiskSize(name string, dataPath string, + maxBytesDiskSize int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - diskLimitFeatIsOn := true - if maxBytesDiskSpace <= 0 { - maxBytesDiskSpace = 0 - diskLimitFeatIsOn = false + enableDiskLimitation := true + if maxBytesDiskSize <= 0 { + maxBytesDiskSize = 0 + enableDiskLimitation = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSpace: maxBytesDiskSpace, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - diskLimitFeatIsOn: diskLimitFeatIsOn, + name: name, + dataPath: dataPath, + maxBytesDiskSize: maxBytesDiskSize, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b0ac900..b4a712a 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 5cd62884e8046eb8d688c323002a2a584125c2e8 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 3 Jun 2021 14:31:47 +0000 Subject: [PATCH 5/6] Replace 8 with a constant to improve readability. --- diskqueue.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 042c276..a9b1193 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -19,11 +19,12 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + numFileMsgsBytes = 8 ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -337,7 +338,7 @@ func (d *diskQueue) readOne() ([]byte, error) { d.maxBytesPerFileRead = stat.Size() if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file - d.maxBytesPerFileRead -= 8 + d.maxBytesPerFileRead -= numFileMsgsBytes } } } @@ -437,7 +438,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -460,7 +461,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.enableDiskLimitation { // save space for the number of messages in this file - fileSize += 8 + fileSize += numFileMsgsBytes d.writeMessages += 1 } From 24fe56aa164e43c93fed06393bec42b507812f33 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 16:48:09 +0000 Subject: [PATCH 6/6] Reset Diskqueue data during readError. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index a9b1193..5821548 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -664,6 +664,7 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 + d.writeMessages = 0 } badFn := d.fileName(d.readFileNum) @@ -684,6 +685,7 @@ func (d *diskQueue) handleReadError() { d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 + d.readMessages = 0 // significant state change, schedule a sync on the next iteration d.needSync = true