From 48c7a85bb1326c9b362a81545e0908af0807ff7a Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 21:15:52 +0000 Subject: [PATCH 1/8] Update totalBytes and writePos when writing number of messages, and resolve the writeMessages off by 1 error. --- diskqueue.go | 15 ++++++--------- diskqueue_test.go | 6 +++--- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e8a5ab5..02f6eef 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -435,12 +435,14 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { return err } + + totalBytes += 8 } // only write to the file once @@ -454,14 +456,11 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 - fileSize := d.writePos - if d.diskLimitFeatIsOn { - // save space for the number of messages in this file - fileSize += 8 + d.writeMessages += 1 } - if fileSize >= d.maxBytesPerFile { + if d.writePos >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -480,8 +479,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } - } else { - d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index d8befe8..7ab9fe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceFeat) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if withDiskSpaceFeat { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From 652b548b97ac8c5eb808638fe6bdd80898b3b2b2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 21:30:30 +0000 Subject: [PATCH 2/8] Remove the additional 8 bytes to totalBytes. --- diskqueue.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 02f6eef..d6e3c72 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -441,8 +441,6 @@ func (d *diskQueue) writeOne(data []byte) error { if err != nil { return err } - - totalBytes += 8 } // only write to the file once From b07cec451ff8d9d2c8f45987347ac0c3dadc76a0 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:37:09 +0000 Subject: [PATCH 3/8] Revert "Remove the additional 8 bytes to totalBytes." This reverts commit 652b548b97ac8c5eb808638fe6bdd80898b3b2b2. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index d6e3c72..02f6eef 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -441,6 +441,8 @@ func (d *diskQueue) writeOne(data []byte) error { if err != nil { return err } + + totalBytes += 8 } // only write to the file once From 5968b13ec8806baaa26b57b81dc7d7e7acd5d053 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:38:05 +0000 Subject: [PATCH 4/8] Revert "Update totalBytes and writePos when writing number of messages, and resolve the writeMessages off by 1 error." This reverts commit 48c7a85bb1326c9b362a81545e0908af0807ff7a. --- diskqueue.go | 15 +++++++++------ diskqueue_test.go | 6 +++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 02f6eef..e8a5ab5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -435,14 +435,12 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) if err != nil { return err } - - totalBytes += 8 } // only write to the file once @@ -456,11 +454,14 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 + fileSize := d.writePos + if d.diskLimitFeatIsOn { - d.writeMessages += 1 + // save space for the number of messages in this file + fileSize += 8 } - if d.writePos >= d.maxBytesPerFile { + if fileSize >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -479,6 +480,8 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } + } else { + d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index 7ab9fe8..d8befe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, withDiskSpaceFeat) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if withDiskSpaceFeat { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From 53c0e473682689f5f0298e11eda4341fa32eb4ef Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:43:03 +0000 Subject: [PATCH 5/8] Revert changes that removed the additional 8 bytes. --- diskqueue.go | 5 ++--- diskqueue_test.go | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e8a5ab5..41ab465 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { // check if we reached the file size limit with this message if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { return err } @@ -459,6 +459,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 + d.writeMessages += 1 } if fileSize >= d.maxBytesPerFile { @@ -480,8 +481,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } - } else { - d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index d8befe8..7ab9fe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceFeat) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if withDiskSpaceFeat { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From a3ce86a63fe0a7969fecf4abc632d71298c57b83 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:49:47 +0000 Subject: [PATCH 6/8] Add comment to make code more readable. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 41ab465..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -421,6 +421,8 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + // add all data to writeBuf before writing to file + // this causes everything to be written to file or nothing d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { From 3ed7338805dcc2cb4bf7da2b5aa21449f3a71d02 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 14:44:23 +0000 Subject: [PATCH 7/8] Add test that completes the write file by meeting the file size limit exactly with a message less than 8 bytes. --- diskqueue_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 7ab9fe8..2518cc8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -351,7 +351,8 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() - msg := make([]byte, 1000) + msgSize := 1000 + msg := make([]byte, msgSize) dq.Put(msg) for i := 0; i < 10; i++ { @@ -391,12 +392,17 @@ next: panic("fail") completeWriteFile: - dq.Put(msg) + // meet the file size limit exactly (2048 bytes) + totalBytes := 2 * (msgSize + 4) + bytesRemaining := 2048 - (totalBytes + 8) + oneByteMsgSizeIncrease := 5 + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - if d.depth == 2 && + if d.depth == 3 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readPos == 1004 && @@ -413,12 +419,14 @@ completeWriteFile: completeReadFile: dq.Put(msg) + <-dq.ReadChan() <-dq.ReadChan() <-dq.ReadChan() for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + t.Log(d.depth, d.readFileNum, d.writeFileNum, d.readPos, d.writePos, d.readMessages, d.writeMessages) if d.depth == 1 && d.readFileNum == 1 && d.writeFileNum == 1 && From 30c6ef662449c70d289a402bb95cf9759c642e1e Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 15:38:54 +0000 Subject: [PATCH 8/8] Add extra testing to validate the increment/decrement of bytes in core code. --- diskqueue_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 2518cc8..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -392,7 +392,8 @@ next: panic("fail") completeWriteFile: - // meet the file size limit exactly (2048 bytes) + // meet the file size limit exactly (2048 bytes) when writeFileNum + // equals readFileNum totalBytes := 2 * (msgSize + 4) bytesRemaining := 2048 - (totalBytes + 8) oneByteMsgSizeIncrease := 5 @@ -401,6 +402,7 @@ completeWriteFile: for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created + // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && d.readFileNum == 0 && @@ -425,8 +427,8 @@ completeReadFile: for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - t.Log(d.depth, d.readFileNum, d.writeFileNum, d.readPos, d.writePos, d.readMessages, d.writeMessages) if d.depth == 1 && d.readFileNum == 1 && d.writeFileNum == 1 && @@ -435,6 +437,64 @@ completeReadFile: d.readMessages == 0 && d.writeMessages == 1 { // success + goto completeWriteFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFileAgain: + // make writeFileNum ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + // meet the file size limit exactly (2048 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a file is completely read + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 7 && + d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success + goto completeReadFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFileAgain: + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 0 && + d.readFileNum == 3 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success goto done } time.Sleep(100 * time.Millisecond)