From 5a4a043febdf34e3b58a47e60f78d33096dbe6b4 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 24 May 2021 19:38:37 +0000 Subject: [PATCH 01/28] Keep track of the number of messages in the writeFile and the number of messages we read in the readFile. Also, reserve the last 4 bytes at the end of the file for the number of messages in that file. --- diskqueue.go | 61 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 26b3438..6bfb88f 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -19,11 +19,12 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + metaDataFormat = "%d\n%d,%d,%d\n%d,%d,%d\n" ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -58,11 +59,13 @@ type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) - readPos int64 - writePos int64 - readFileNum int64 - writeFileNum int64 - depth int64 + readPos int64 + writePos int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 + writeMessages int64 + depth int64 sync.RWMutex @@ -300,7 +303,8 @@ func (d *diskQueue) readOne() ([]byte, error) { if d.readFileNum < d.writeFileNum { stat, err := d.readFile.Stat() if err == nil { - d.maxBytesPerFileRead = stat.Size() + // last 4 bytes are reserved for the number of messages in this file + d.maxBytesPerFileRead = stat.Size() - 4 } } @@ -394,6 +398,17 @@ func (d *diskQueue) writeOne(data []byte) error { return err } + totalBytes := int64(4 + dataLen) + + // check if we reached the file size limit with this message + if d.writePos+totalBytes >= d.maxBytesPerFile { + // write number of messages in binary to file + err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) + if err != nil { + return err + } + } + // only write to the file once _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { @@ -402,10 +417,11 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 + d.writeMessages += 1 + // save space for the number of messages in this file if d.writePos >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos @@ -413,6 +429,7 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 + d.writeMessages = 0 // sync every time we start writing to a new file err = d.sync() @@ -461,15 +478,14 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - var depth int64 - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &depth, - &d.readFileNum, &d.readPos, - &d.writeFileNum, &d.writePos) + _, err = fmt.Fscanf(f, metaDataFormat, + &d.depth, + &d.readFileNum, &d.readMessages, &d.readPos, + &d.writeFileNum, &d.writeMessages, &d.writePos) if err != nil { return err } - d.depth = depth + d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos @@ -490,10 +506,10 @@ func (d *diskQueue) persistMetaData() error { return err } - _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", + _, err = fmt.Fprintf(f, metaDataFormat, d.depth, - d.readFileNum, d.readPos, - d.writeFileNum, d.writePos) + d.readFileNum, d.readMessages, d.readPos, + d.writeFileNum, d.writeMessages, d.writePos) if err != nil { f.Close() return err @@ -558,9 +574,12 @@ func (d *diskQueue) moveForward() { d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 + d.readMessages += 1 // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { + d.readMessages = 0 + // sync every time we start reading from a new file d.needSync = true From 15339ebff15b44507202d38a20ffaa7d1a3eb925 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 24 May 2021 19:41:03 +0000 Subject: [PATCH 02/28] Update tests to reflect changes made to the metadata file. Metadata file now contains readMessages and writeMessages. --- diskqueue_test.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index ba5879c..7b1068b 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -250,11 +250,13 @@ func TestDiskQueueCorruption(t *testing.T) { } type md struct { - depth int64 - readFileNum int64 - writeFileNum int64 - readPos int64 - writePos int64 + depth int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 + writeMessages int64 + readPos int64 + writePos int64 } func readMetaDataFile(fileName string, retried int) md { @@ -272,10 +274,10 @@ func readMetaDataFile(fileName string, retried int) md { defer f.Close() var ret md - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, - &ret.readFileNum, &ret.readPos, - &ret.writeFileNum, &ret.writePos) + &ret.readFileNum, &ret.readMessages, &ret.readPos, + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) if err != nil { panic(err) } @@ -302,7 +304,9 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 0 && - d.writePos == 1004 { + d.writePos == 1004 && + d.readMessages == 0 && + d.writeMessages == 1 { // success goto next } @@ -320,7 +324,9 @@ next: d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 1004 && - d.writePos == 2008 { + d.writePos == 2008 && + d.readMessages == 1 && + d.writeMessages == 2 { // success goto done } From ddadcb93d540c82469142806db98a3bbc9d4b41f Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 24 May 2021 20:21:26 +0000 Subject: [PATCH 03/28] Test thtat write messages resets when a new file is created. --- diskqueue_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/diskqueue_test.go b/diskqueue_test.go index 7b1068b..a6921af 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -328,6 +328,26 @@ next: d.readMessages == 1 && d.writeMessages == 2 { // success + goto final + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +final: + dq.Put(msg) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a new file is created + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + if d.depth == 2 && + d.readFileNum == 0 && + d.writeFileNum == 1 && + d.readPos == 1004 && + d.writePos == 0 && + d.readMessages == 1 && + d.writeMessages == 0 { + // success goto done } time.Sleep(100 * time.Millisecond) From 609f5c9ca8103210b7a126b3bf802f2a7ba24096 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 24 May 2021 20:41:18 +0000 Subject: [PATCH 04/28] Add a test that checks if meta data file is correct after writing a complete file and completely reading a file. --- diskqueue_test.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index a6921af..e0bc80c 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -328,13 +328,13 @@ next: d.readMessages == 1 && d.writeMessages == 2 { // success - goto final + goto completeWriteFile } time.Sleep(100 * time.Millisecond) } panic("fail") -final: +completeWriteFile: dq.Put(msg) for i := 0; i < 10; i++ { @@ -348,6 +348,29 @@ final: d.readMessages == 1 && d.writeMessages == 0 { // success + goto completeReadFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFile: + dq.Put(msg) + + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + if d.depth == 1 && + d.readFileNum == 1 && + d.writeFileNum == 1 && + d.readPos == 0 && + d.writePos == 1004 && + d.readMessages == 0 && + d.writeMessages == 1 { + // success goto done } time.Sleep(100 * time.Millisecond) From cd45da27016d29bc68d71504f6d6421804868002 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 18:27:43 +0000 Subject: [PATCH 05/28] Allocate 8 bytes instead of 4 since writeMessages is in64, and reset writeMessages and readMessages in skipToNextRWFile. --- diskqueue.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 6bfb88f..7655e85 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -262,13 +262,16 @@ func (d *diskQueue) skipToNextRWFile() error { } } + d.depth = 0 + d.nextReadPos = 0 + d.readMessages = 0 + d.readPos = 0 d.writeFileNum++ + d.writeMessages = 0 d.writePos = 0 + d.readFileNum = d.writeFileNum - d.readPos = 0 d.nextReadFileNum = d.writeFileNum - d.nextReadPos = 0 - d.depth = 0 return err } @@ -304,7 +307,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { // last 4 bytes are reserved for the number of messages in this file - d.maxBytesPerFileRead = stat.Size() - 4 + d.maxBytesPerFileRead = stat.Size() - 8 } } @@ -401,9 +404,9 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.writePos+totalBytes >= d.maxBytesPerFile { + if d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) if err != nil { return err } @@ -419,10 +422,9 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 - d.writeMessages += 1 // save space for the number of messages in this file - if d.writePos >= d.maxBytesPerFile { + if d.writePos+8 >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -441,6 +443,8 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } + } else { + d.writeMessages += 1 } return err From eeaa65ab293f6b53bacdb79b282c21291a3819a2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 18:29:43 +0000 Subject: [PATCH 06/28] Update comment about number of bytes reserved for writeMessages. --- diskqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diskqueue.go b/diskqueue.go index 7655e85..df26929 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -306,7 +306,7 @@ func (d *diskQueue) readOne() ([]byte, error) { if d.readFileNum < d.writeFileNum { stat, err := d.readFile.Stat() if err == nil { - // last 4 bytes are reserved for the number of messages in this file + // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead = stat.Size() - 8 } } From 2496effa8ff9e313ba0b173e402a16a9793ef69d Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 20:53:55 +0000 Subject: [PATCH 07/28] Allow user to choose whether to use new feature or not. --- diskqueue.go | 82 +++++++++++++++++++++++++++++++++++------------ diskqueue_test.go | 2 +- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index df26929..1443c3d 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -19,12 +19,11 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) - metaDataFormat = "%d\n%d,%d,%d\n%d,%d,%d\n" + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -72,6 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string + maxBytesDiskSpace int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -111,9 +111,24 @@ type diskQueue struct { func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + + return NewWithDiskSpace(name, dataPath, + 0, maxBytesPerFile, + minMsgSize, maxMsgSize, + syncEvery, syncTimeout, logf) +} + +func NewWithDiskSpace(name string, dataPath string, + maxBytesDiskSpace int64, maxBytesPerFile int64, + minMsgSize int32, maxMsgSize int32, + syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + if maxBytesDiskSpace <= 0 { + maxBytesDiskSpace = 0 + } d := diskQueue{ name: name, dataPath: dataPath, + maxBytesDiskSpace: maxBytesDiskSpace, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, @@ -130,6 +145,11 @@ func New(name string, dataPath string, maxBytesPerFile int64, logf: logf, } + d.start() + return &d +} + +func (d *diskQueue) start() { // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { @@ -137,7 +157,6 @@ func New(name string, dataPath string, maxBytesPerFile int64, } go d.ioLoop() - return &d } // Depth returns the depth of the queue @@ -306,8 +325,11 @@ func (d *diskQueue) readOne() ([]byte, error) { if d.readFileNum < d.writeFileNum { stat, err := d.readFile.Stat() if err == nil { - // last 8 bytes are reserved for the number of messages in this file - d.maxBytesPerFileRead = stat.Size() - 8 + d.maxBytesPerFileRead = stat.Size() + if d.maxBytesDiskSpace > 0 { + // last 8 bytes are reserved for the number of messages in this file + d.maxBytesPerFileRead -= 8 + } } } @@ -404,7 +426,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.maxBytesDiskSpace > 0 && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) if err != nil { @@ -423,8 +445,13 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 - // save space for the number of messages in this file - if d.writePos+8 >= d.maxBytesPerFile { + fileSize := d.writePos + + if d.maxBytesDiskSpace > 0 { + // save space for the number of messages in this file + fileSize += 8 + } + if fileSize >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -482,10 +509,18 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - _, err = fmt.Fscanf(f, metaDataFormat, - &d.depth, - &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeFileNum, &d.writeMessages, &d.writePos) + if d.maxBytesDiskSpace > 0 { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &d.depth, + &d.readFileNum, &d.readMessages, &d.readPos, + &d.writeFileNum, &d.writeMessages, &d.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &d.depth, + &d.readFileNum, &d.readPos, + &d.writeFileNum, &d.writePos) + } + if err != nil { return err } @@ -510,10 +545,17 @@ func (d *diskQueue) persistMetaData() error { return err } - _, err = fmt.Fprintf(f, metaDataFormat, - d.depth, - d.readFileNum, d.readMessages, d.readPos, - d.writeFileNum, d.writeMessages, d.writePos) + if d.maxBytesDiskSpace > 0 { + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + d.depth, + d.readFileNum, d.readMessages, d.readPos, + d.writeFileNum, d.writeMessages, d.writePos) + } else { + _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", + d.depth, + d.readFileNum, d.readPos, + d.writeFileNum, d.writePos) + } if err != nil { f.Close() return err diff --git a/diskqueue_test.go b/diskqueue_test.go index e0bc80c..30aa810 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -292,7 +292,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msg := make([]byte, 1000) From b35c029814a2901ee24b5214940dd8106c52eb9b Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 21:01:58 +0000 Subject: [PATCH 08/28] Separate testing from the original with the implementation of depth for disk space limit. --- diskqueue_test.go | 76 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 10 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 30aa810..7b75de4 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int) md { +func readMetaDataFile(fileName string, retried int, withDiskSpaceImpl bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,17 +267,24 @@ func readMetaDataFile(fileName string, retried int) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried) + return readMetaDataFile(fileName, retried, withDiskSpaceImpl) } panic(err) } defer f.Close() var ret md - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", - &ret.depth, - &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + if withDiskSpaceImpl { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readMessages, &ret.readPos, + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readPos, + &ret.writeFileNum, &ret.writePos) + } if err != nil { panic(err) } @@ -285,6 +292,55 @@ func readMetaDataFile(fileName string, retried int) md { } func TestDiskQueueSyncAfterRead(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, 1000) + dq.Put(msg) + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 0 && + d.writePos == 1004 { + // success + goto next + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +next: + dq.Put(msg) + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 1004 && + d.writePos == 2008 { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { l := NewTestLogger(t) dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) @@ -299,7 +355,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { dq.Put(msg) for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -319,7 +375,7 @@ next: <-dq.ReadChan() for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -339,7 +395,7 @@ completeWriteFile: for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 2 && d.readFileNum == 0 && d.writeFileNum == 1 && @@ -362,7 +418,7 @@ completeReadFile: for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && d.readFileNum == 1 && d.writeFileNum == 1 && From 5cdd0ae49492912d6245cd325ba08b57497224d2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 21:06:20 +0000 Subject: [PATCH 09/28] Revert to original code style. --- diskqueue.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 1443c3d..e69d697 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -281,16 +281,15 @@ func (d *diskQueue) skipToNextRWFile() error { } } - d.depth = 0 - d.nextReadPos = 0 - d.readMessages = 0 - d.readPos = 0 d.writeFileNum++ - d.writeMessages = 0 d.writePos = 0 - d.readFileNum = d.writeFileNum + d.readPos = 0 d.nextReadFileNum = d.writeFileNum + d.nextReadPos = 0 + d.depth = 0 + d.readMessages = 0 + d.writeMessages = 0 return err } From fcb31ccf83adde97a7dcd2b684346ac2239a32e0 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 21:18:21 +0000 Subject: [PATCH 10/28] Add comment for functions. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index e69d697..1073a94 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -118,6 +118,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, syncEvery, syncTimeout, logf) } +// Another constructor that allows users to use Disk Space Limit feature func NewWithDiskSpace(name string, dataPath string, maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, @@ -149,6 +150,7 @@ func NewWithDiskSpace(name string, dataPath string, return &d } +// Get the last known state of DiskQueue from metadata and start ioLoop func (d *diskQueue) start() { // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() From c90f17fec6c9baf0312dcbfce756ac3fe51f4ae9 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 27 May 2021 21:22:34 +0000 Subject: [PATCH 11/28] Explain that maxBytesDiskSpace is 0 when user is not using disk space limit feature. --- diskqueue.go | 4 ++++ diskqueue_test.go | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 1073a94..49b4748 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -119,6 +119,8 @@ func New(name string, dataPath string, maxBytesPerFile int64, } // Another constructor that allows users to use Disk Space Limit feature +// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// be 0 func NewWithDiskSpace(name string, dataPath string, maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, @@ -510,6 +512,7 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() + // if user is using disk space limit feature if d.maxBytesDiskSpace > 0 { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, @@ -546,6 +549,7 @@ func (d *diskQueue) persistMetaData() error { return err } + // if user is using disk space limit feature if d.maxBytesDiskSpace > 0 { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, diff --git a/diskqueue_test.go b/diskqueue_test.go index 7b75de4..d8befe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceImpl bool) md { +func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceImpl bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceImpl) + return readMetaDataFile(fileName, retried, withDiskSpaceFeat) } panic(err) } defer f.Close() var ret md - if withDiskSpaceImpl { + if withDiskSpaceFeat { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From a28e6d5b47618acb7fc184114e1128d02f847727 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 14:51:15 +0000 Subject: [PATCH 12/28] Add explicit disk size limit feature flag. --- diskqueue.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 49b4748..7661d10 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -104,6 +104,9 @@ type diskQueue struct { exitSyncChan chan int logf AppLogFunc + + // disk limit implementation flag + diskLimitFeatIsOn bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -125,8 +128,10 @@ func NewWithDiskSpace(name string, dataPath string, maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + diskLimitFeatIsOn := true if maxBytesDiskSpace <= 0 { maxBytesDiskSpace = 0 + diskLimitFeatIsOn = false } d := diskQueue{ name: name, @@ -146,6 +151,7 @@ func NewWithDiskSpace(name string, dataPath string, syncEvery: syncEvery, syncTimeout: syncTimeout, logf: logf, + diskLimitFeatIsOn: diskLimitFeatIsOn, } d.start() @@ -329,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.maxBytesDiskSpace > 0 { + if d.diskLimitFeatIsOn { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -429,7 +435,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.maxBytesDiskSpace > 0 && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) if err != nil { @@ -450,7 +456,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.maxBytesDiskSpace > 0 { + if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 } @@ -513,7 +519,7 @@ func (d *diskQueue) retrieveMetaData() error { defer f.Close() // if user is using disk space limit feature - if d.maxBytesDiskSpace > 0 { + if d.diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -550,7 +556,7 @@ func (d *diskQueue) persistMetaData() error { } // if user is using disk space limit feature - if d.maxBytesDiskSpace > 0 { + if d.diskLimitFeatIsOn { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, From 293a7749bd202b9e430f9bc8d27e184dde405666 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 14:54:09 +0000 Subject: [PATCH 13/28] Add a new line after if block. --- diskqueue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/diskqueue.go b/diskqueue.go index 7661d10..e8a5ab5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -460,6 +460,7 @@ func (d *diskQueue) writeOne(data []byte) error { // save space for the number of messages in this file fileSize += 8 } + if fileSize >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos From 4c956aaa821531f31baf4a585012d792ed18184a Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 20:29:48 +0000 Subject: [PATCH 14/28] Track the disk space the files tracked by DiskQueue takes up, and test that these numbers are accurate. --- diskqueue.go | 39 ++++++++++++++++++++++++++------------- diskqueue_test.go | 10 ++++++++-- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e8a5ab5..1058c47 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -64,6 +64,7 @@ type diskQueue struct { writeFileNum int64 readMessages int64 writeMessages int64 + writeBytes int64 depth int64 sync.RWMutex @@ -107,6 +108,9 @@ type diskQueue struct { // disk limit implementation flag diskLimitFeatIsOn bool + + // the size of the + readMsgSize int32 } // New instantiates an instance of diskQueue, retrieving metadata @@ -300,6 +304,7 @@ func (d *diskQueue) skipToNextRWFile() error { d.depth = 0 d.readMessages = 0 d.writeMessages = 0 + d.writeBytes = 0 return err } @@ -308,7 +313,6 @@ func (d *diskQueue) skipToNextRWFile() error { // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error - var msgSize int32 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) @@ -345,22 +349,22 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } - err = binary.Read(d.reader, binary.BigEndian, &msgSize) + err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } - if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { + if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", msgSize) + return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize) } - readBuf := make([]byte, msgSize) + readBuf := make([]byte, d.readMsgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() @@ -368,7 +372,7 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, err } - totalBytes := int64(4 + msgSize) + totalBytes := int64(4 + d.readMsgSize) // we only advance next* because we have not yet sent this to consumers // (where readFileNum, readPos will actually be advanced) @@ -459,6 +463,8 @@ func (d *diskQueue) writeOne(data []byte) error { if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 + d.writeBytes += totalBytes + d.writeMessages += 1 } if fileSize >= d.maxBytesPerFile { @@ -468,7 +474,12 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 - d.writeMessages = 0 + + if d.diskLimitFeatIsOn { + // add bytes for the number of messages in the file + d.writeBytes += 8 + d.writeMessages = 0 + } // sync every time we start writing to a new file err = d.sync() @@ -480,8 +491,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } - } else { - d.writeMessages += 1 } return err @@ -521,10 +530,10 @@ func (d *diskQueue) retrieveMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &d.depth, @@ -558,10 +567,10 @@ func (d *diskQueue) persistMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeFileNum, d.writeMessages, d.writePos) + d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -628,6 +637,8 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } func (d *diskQueue) moveForward() { + // add bytes for the number of messages and the size of the message + readFileLen := int64(d.readMsgSize) + d.readPos + 12 oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos @@ -646,6 +657,8 @@ func (d *diskQueue) moveForward() { if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) } + + d.writeBytes -= readFileLen } d.checkTailCorruption(d.depth) diff --git a/diskqueue_test.go b/diskqueue_test.go index d8befe8..a976bb1 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -251,6 +251,7 @@ func TestDiskQueueCorruption(t *testing.T) { type md struct { depth int64 + writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,10 +276,10 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { var ret md if withDiskSpaceFeat { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, @@ -357,6 +358,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 0 && @@ -377,6 +379,7 @@ next: for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 1004 && @@ -397,6 +400,7 @@ completeWriteFile: // test that write position and messages reset when a new file is created d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 2 && + d.writeBytes == 3020 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readPos == 1004 && @@ -419,7 +423,9 @@ completeReadFile: for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + t.Logf("Write bytes: %d", d.writeBytes) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && d.readPos == 0 && From 48c7a85bb1326c9b362a81545e0908af0807ff7a Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 21:15:52 +0000 Subject: [PATCH 15/28] Update totalBytes and writePos when writing number of messages, and resolve the writeMessages off by 1 error. --- diskqueue.go | 15 ++++++--------- diskqueue_test.go | 6 +++--- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e8a5ab5..02f6eef 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -435,12 +435,14 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { return err } + + totalBytes += 8 } // only write to the file once @@ -454,14 +456,11 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 - fileSize := d.writePos - if d.diskLimitFeatIsOn { - // save space for the number of messages in this file - fileSize += 8 + d.writeMessages += 1 } - if fileSize >= d.maxBytesPerFile { + if d.writePos >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -480,8 +479,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } - } else { - d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index d8befe8..7ab9fe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceFeat) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if withDiskSpaceFeat { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From 652b548b97ac8c5eb808638fe6bdd80898b3b2b2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 1 Jun 2021 21:30:30 +0000 Subject: [PATCH 16/28] Remove the additional 8 bytes to totalBytes. --- diskqueue.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 02f6eef..d6e3c72 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -441,8 +441,6 @@ func (d *diskQueue) writeOne(data []byte) error { if err != nil { return err } - - totalBytes += 8 } // only write to the file once From b07cec451ff8d9d2c8f45987347ac0c3dadc76a0 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:37:09 +0000 Subject: [PATCH 17/28] Revert "Remove the additional 8 bytes to totalBytes." This reverts commit 652b548b97ac8c5eb808638fe6bdd80898b3b2b2. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index d6e3c72..02f6eef 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -441,6 +441,8 @@ func (d *diskQueue) writeOne(data []byte) error { if err != nil { return err } + + totalBytes += 8 } // only write to the file once From 5968b13ec8806baaa26b57b81dc7d7e7acd5d053 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:38:05 +0000 Subject: [PATCH 18/28] Revert "Update totalBytes and writePos when writing number of messages, and resolve the writeMessages off by 1 error." This reverts commit 48c7a85bb1326c9b362a81545e0908af0807ff7a. --- diskqueue.go | 15 +++++++++------ diskqueue_test.go | 6 +++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 02f6eef..e8a5ab5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -435,14 +435,12 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) if err != nil { return err } - - totalBytes += 8 } // only write to the file once @@ -456,11 +454,14 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 + fileSize := d.writePos + if d.diskLimitFeatIsOn { - d.writeMessages += 1 + // save space for the number of messages in this file + fileSize += 8 } - if d.writePos >= d.maxBytesPerFile { + if fileSize >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -479,6 +480,8 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } + } else { + d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index 7ab9fe8..d8befe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, withDiskSpaceFeat) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if withDiskSpaceFeat { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From 53c0e473682689f5f0298e11eda4341fa32eb4ef Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:43:03 +0000 Subject: [PATCH 19/28] Revert changes that removed the additional 8 bytes. --- diskqueue.go | 5 ++--- diskqueue_test.go | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e8a5ab5..41ab465 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { // check if we reached the file size limit with this message if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { return err } @@ -459,6 +459,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 + d.writeMessages += 1 } if fileSize >= d.maxBytesPerFile { @@ -480,8 +481,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile.Close() d.writeFile = nil } - } else { - d.writeMessages += 1 } return err diff --git a/diskqueue_test.go b/diskqueue_test.go index d8befe8..7ab9fe8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceFeat) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if withDiskSpaceFeat { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, From a3ce86a63fe0a7969fecf4abc632d71298c57b83 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 13:49:47 +0000 Subject: [PATCH 20/28] Add comment to make code more readable. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 41ab465..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -421,6 +421,8 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + // add all data to writeBuf before writing to file + // this causes everything to be written to file or nothing d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { From 3ed7338805dcc2cb4bf7da2b5aa21449f3a71d02 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 14:44:23 +0000 Subject: [PATCH 21/28] Add test that completes the write file by meeting the file size limit exactly with a message less than 8 bytes. --- diskqueue_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 7ab9fe8..2518cc8 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -351,7 +351,8 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() - msg := make([]byte, 1000) + msgSize := 1000 + msg := make([]byte, msgSize) dq.Put(msg) for i := 0; i < 10; i++ { @@ -391,12 +392,17 @@ next: panic("fail") completeWriteFile: - dq.Put(msg) + // meet the file size limit exactly (2048 bytes) + totalBytes := 2 * (msgSize + 4) + bytesRemaining := 2048 - (totalBytes + 8) + oneByteMsgSizeIncrease := 5 + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - if d.depth == 2 && + if d.depth == 3 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readPos == 1004 && @@ -413,12 +419,14 @@ completeWriteFile: completeReadFile: dq.Put(msg) + <-dq.ReadChan() <-dq.ReadChan() <-dq.ReadChan() for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + t.Log(d.depth, d.readFileNum, d.writeFileNum, d.readPos, d.writePos, d.readMessages, d.writeMessages) if d.depth == 1 && d.readFileNum == 1 && d.writeFileNum == 1 && From 30c6ef662449c70d289a402bb95cf9759c642e1e Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 15:38:54 +0000 Subject: [PATCH 22/28] Add extra testing to validate the increment/decrement of bytes in core code. --- diskqueue_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 2518cc8..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -392,7 +392,8 @@ next: panic("fail") completeWriteFile: - // meet the file size limit exactly (2048 bytes) + // meet the file size limit exactly (2048 bytes) when writeFileNum + // equals readFileNum totalBytes := 2 * (msgSize + 4) bytesRemaining := 2048 - (totalBytes + 8) oneByteMsgSizeIncrease := 5 @@ -401,6 +402,7 @@ completeWriteFile: for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created + // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && d.readFileNum == 0 && @@ -425,8 +427,8 @@ completeReadFile: for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - t.Log(d.depth, d.readFileNum, d.writeFileNum, d.readPos, d.writePos, d.readMessages, d.writeMessages) if d.depth == 1 && d.readFileNum == 1 && d.writeFileNum == 1 && @@ -435,6 +437,64 @@ completeReadFile: d.readMessages == 0 && d.writeMessages == 1 { // success + goto completeWriteFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFileAgain: + // make writeFileNum ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + // meet the file size limit exactly (2048 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a file is completely read + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 7 && + d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success + goto completeReadFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFileAgain: + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 0 && + d.readFileNum == 3 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success goto done } time.Sleep(100 * time.Millisecond) From e9b23ac5eab849aed8fc2972d06870fcbf243f09 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:03:10 +0000 Subject: [PATCH 23/28] Update variable names. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3ac27ca..042c276 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSpace int64 + maxBytesDiskSize int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - diskLimitFeatIsOn bool + enableDiskLimitation bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSpace(name, dataPath, + return NewWithDiskSize(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Space Limit feature -// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// Another constructor that allows users to use Disk Size Limit feature +// If user is not using Disk Size Limit feature, maxBytesDiskSize will // be 0 -func NewWithDiskSpace(name string, dataPath string, - maxBytesDiskSpace int64, maxBytesPerFile int64, +func NewWithDiskSize(name string, dataPath string, + maxBytesDiskSize int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - diskLimitFeatIsOn := true - if maxBytesDiskSpace <= 0 { - maxBytesDiskSpace = 0 - diskLimitFeatIsOn = false + enableDiskLimitation := true + if maxBytesDiskSize <= 0 { + maxBytesDiskSize = 0 + enableDiskLimitation = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSpace: maxBytesDiskSpace, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - diskLimitFeatIsOn: diskLimitFeatIsOn, + name: name, + dataPath: dataPath, + maxBytesDiskSize: maxBytesDiskSize, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b0ac900..b4a712a 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 7acba5f0928bf09c13aafcbb1a046caf42055851 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:05:39 +0000 Subject: [PATCH 24/28] Revert "Update variable names." This reverts commit e9b23ac5eab849aed8fc2972d06870fcbf243f09. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 042c276..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSize int64 + maxBytesDiskSpace int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - enableDiskLimitation bool + diskLimitFeatIsOn bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSize(name, dataPath, + return NewWithDiskSpace(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Size Limit feature -// If user is not using Disk Size Limit feature, maxBytesDiskSize will +// Another constructor that allows users to use Disk Space Limit feature +// If user is not using Disk Space Limit feature, maxBytesDiskSpace will // be 0 -func NewWithDiskSize(name string, dataPath string, - maxBytesDiskSize int64, maxBytesPerFile int64, +func NewWithDiskSpace(name string, dataPath string, + maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - enableDiskLimitation := true - if maxBytesDiskSize <= 0 { - maxBytesDiskSize = 0 - enableDiskLimitation = false + diskLimitFeatIsOn := true + if maxBytesDiskSpace <= 0 { + maxBytesDiskSpace = 0 + diskLimitFeatIsOn = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSize: maxBytesDiskSize, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - enableDiskLimitation: enableDiskLimitation, + name: name, + dataPath: dataPath, + maxBytesDiskSpace: maxBytesDiskSpace, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + diskLimitFeatIsOn: diskLimitFeatIsOn, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.enableDiskLimitation { + if d.diskLimitFeatIsOn { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.enableDiskLimitation { + if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk size limit feature - if d.enableDiskLimitation { + // if user is using disk space limit feature + if d.diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk size limit feature - if d.enableDiskLimitation { + // if user is using disk space limit feature + if d.diskLimitFeatIsOn { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b4a712a..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) m if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, enableDiskLimitation) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if enableDiskLimitation { + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 92819e147ada3e0fb63f8768c19e8c9800cf1c27 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:08:43 +0000 Subject: [PATCH 25/28] Revert "Merge branch 'TrackDiskSize' into DepthImpl" This reverts commit 76a0ddd1719105e1d13e57988557783b9d6af9bd, reversing changes made to 30c6ef662449c70d289a402bb95cf9759c642e1e. --- diskqueue.go | 36 +++++++++++------------------------- diskqueue_test.go | 11 ++--------- 2 files changed, 13 insertions(+), 34 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3aab8e5..3ac27ca 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -64,7 +64,6 @@ type diskQueue struct { writeFileNum int64 readMessages int64 writeMessages int64 - writeBytes int64 depth int64 sync.RWMutex @@ -108,9 +107,6 @@ type diskQueue struct { // disk limit implementation flag diskLimitFeatIsOn bool - - // the size of the - readMsgSize int32 } // New instantiates an instance of diskQueue, retrieving metadata @@ -304,7 +300,6 @@ func (d *diskQueue) skipToNextRWFile() error { d.depth = 0 d.readMessages = 0 d.writeMessages = 0 - d.writeBytes = 0 return err } @@ -313,6 +308,7 @@ func (d *diskQueue) skipToNextRWFile() error { // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error + var msgSize int32 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) @@ -349,22 +345,22 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } - err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize) + err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } - if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize { + if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize) + return nil, fmt.Errorf("invalid message read size (%d)", msgSize) } - readBuf := make([]byte, d.readMsgSize) + readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() @@ -372,7 +368,7 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, err } - totalBytes := int64(4 + d.readMsgSize) + totalBytes := int64(4 + msgSize) // we only advance next* because we have not yet sent this to consumers // (where readFileNum, readPos will actually be advanced) @@ -465,7 +461,6 @@ func (d *diskQueue) writeOne(data []byte) error { if d.diskLimitFeatIsOn { // save space for the number of messages in this file fileSize += 8 - d.writeBytes += totalBytes d.writeMessages += 1 } @@ -476,12 +471,7 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 - - if d.diskLimitFeatIsOn { - // add bytes for the number of messages in the file - d.writeBytes += 8 - d.writeMessages = 0 - } + d.writeMessages = 0 // sync every time we start writing to a new file err = d.sync() @@ -532,10 +522,10 @@ func (d *diskQueue) retrieveMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &d.depth, @@ -569,10 +559,10 @@ func (d *diskQueue) persistMetaData() error { // if user is using disk space limit feature if d.diskLimitFeatIsOn { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) + d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -639,8 +629,6 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } func (d *diskQueue) moveForward() { - // add bytes for the number of messages and the size of the message - readFileLen := int64(d.readMsgSize) + d.readPos + 12 oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos @@ -659,8 +647,6 @@ func (d *diskQueue) moveForward() { if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) } - - d.writeBytes -= readFileLen } d.checkTailCorruption(d.depth) diff --git a/diskqueue_test.go b/diskqueue_test.go index 8d4f78f..b0ac900 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -251,7 +251,6 @@ func TestDiskQueueCorruption(t *testing.T) { type md struct { depth int64 - writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,12 +274,11 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { defer f.Close() var ret md - if diskLimitFeatIsOn { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, @@ -360,7 +358,6 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 0 && @@ -381,7 +378,6 @@ next: for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readPos == 1004 && @@ -409,7 +405,6 @@ completeWriteFile: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && - d.writeBytes == 3020 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readPos == 1004 && @@ -434,9 +429,7 @@ completeReadFile: // test that read position and messages reset when a file is completely read // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - t.Logf("Write bytes: %d", d.writeBytes) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && d.readPos == 0 && From 2616c7e47438cf0c881ab5eca74d4e320a384822 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 21:13:34 +0000 Subject: [PATCH 26/28] Update variable names. --- diskqueue.go | 72 +++++++++++++++++++++++------------------------ diskqueue_test.go | 8 +++--- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3ac27ca..042c276 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -71,7 +71,7 @@ type diskQueue struct { // instantiation time metadata name string dataPath string - maxBytesDiskSpace int64 + maxBytesDiskSize int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -106,7 +106,7 @@ type diskQueue struct { logf AppLogFunc // disk limit implementation flag - diskLimitFeatIsOn bool + enableDiskLimitation bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -115,43 +115,43 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSpace(name, dataPath, + return NewWithDiskSize(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Space Limit feature -// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// Another constructor that allows users to use Disk Size Limit feature +// If user is not using Disk Size Limit feature, maxBytesDiskSize will // be 0 -func NewWithDiskSpace(name string, dataPath string, - maxBytesDiskSpace int64, maxBytesPerFile int64, +func NewWithDiskSize(name string, dataPath string, + maxBytesDiskSize int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - diskLimitFeatIsOn := true - if maxBytesDiskSpace <= 0 { - maxBytesDiskSpace = 0 - diskLimitFeatIsOn = false + enableDiskLimitation := true + if maxBytesDiskSize <= 0 { + maxBytesDiskSize = 0 + enableDiskLimitation = false } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesDiskSpace: maxBytesDiskSpace, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, - diskLimitFeatIsOn: diskLimitFeatIsOn, + name: name, + dataPath: dataPath, + maxBytesDiskSize: maxBytesDiskSize, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, } d.start() @@ -335,7 +335,7 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file d.maxBytesPerFileRead -= 8 } @@ -437,7 +437,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -458,7 +458,7 @@ func (d *diskQueue) writeOne(data []byte) error { fileSize := d.writePos - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += 8 d.writeMessages += 1 @@ -520,8 +520,8 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, @@ -557,8 +557,8 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk space limit feature - if d.diskLimitFeatIsOn { + // if user is using disk size limit feature + if d.enableDiskLimitation { _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, diff --git a/diskqueue_test.go b/diskqueue_test.go index b0ac900..b4a712a 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -259,7 +259,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -267,14 +267,14 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - if diskLimitFeatIsOn { + if enableDiskLimitation { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -348,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 From 5cd62884e8046eb8d688c323002a2a584125c2e8 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 3 Jun 2021 14:31:47 +0000 Subject: [PATCH 27/28] Replace 8 with a constant to improve readability. --- diskqueue.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 042c276..a9b1193 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -19,11 +19,12 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + numFileMsgsBytes = 8 ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -337,7 +338,7 @@ func (d *diskQueue) readOne() ([]byte, error) { d.maxBytesPerFileRead = stat.Size() if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file - d.maxBytesPerFileRead -= 8 + d.maxBytesPerFileRead -= numFileMsgsBytes } } } @@ -437,7 +438,7 @@ func (d *diskQueue) writeOne(data []byte) error { totalBytes := int64(4 + dataLen) // check if we reached the file size limit with this message - if d.enableDiskLimitation && d.writePos+totalBytes+8 >= d.maxBytesPerFile { + if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -460,7 +461,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.enableDiskLimitation { // save space for the number of messages in this file - fileSize += 8 + fileSize += numFileMsgsBytes d.writeMessages += 1 } From 24fe56aa164e43c93fed06393bec42b507812f33 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 16:48:09 +0000 Subject: [PATCH 28/28] Reset Diskqueue data during readError. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index a9b1193..5821548 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -664,6 +664,7 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 + d.writeMessages = 0 } badFn := d.fileName(d.readFileNum) @@ -684,6 +685,7 @@ func (d *diskQueue) handleReadError() { d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 + d.readMessages = 0 // significant state change, schedule a sync on the next iteration d.needSync = true