diff --git a/diskqueue.go b/diskqueue.go index 26b3438..5821548 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -19,11 +19,12 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + numFileMsgsBytes = 8 ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -58,17 +59,20 @@ type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) - readPos int64 - writePos int64 - readFileNum int64 - writeFileNum int64 - depth int64 + readPos int64 + writePos int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 + writeMessages int64 + depth int64 sync.RWMutex // instantiation time metadata name string dataPath string + maxBytesDiskSize int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -101,6 +105,9 @@ type diskQueue struct { exitSyncChan chan int logf AppLogFunc + + // disk limit implementation flag + enableDiskLimitation bool } // New instantiates an instance of diskQueue, retrieving metadata @@ -108,25 +115,52 @@ type diskQueue struct { func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, + + return NewWithDiskSize(name, dataPath, + 0, maxBytesPerFile, + minMsgSize, maxMsgSize, + syncEvery, syncTimeout, logf) +} + +// Another constructor that allows users to use Disk Size Limit feature +// If user is not using Disk Size Limit feature, maxBytesDiskSize will +// be 0 +func NewWithDiskSize(name string, dataPath string, + maxBytesDiskSize int64, maxBytesPerFile int64, + minMsgSize int32, maxMsgSize int32, + syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + enableDiskLimitation := true + if maxBytesDiskSize <= 0 { + maxBytesDiskSize = 0 + enableDiskLimitation = false } + d := diskQueue{ + name: name, + dataPath: dataPath, + maxBytesDiskSize: maxBytesDiskSize, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, + } + + d.start() + return &d +} +// Get the last known state of DiskQueue from metadata and start ioLoop +func (d *diskQueue) start() { // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { @@ -134,7 +168,6 @@ func New(name string, dataPath string, maxBytesPerFile int64, } go d.ioLoop() - return &d } // Depth returns the depth of the queue @@ -266,6 +299,8 @@ func (d *diskQueue) skipToNextRWFile() error { d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 d.depth = 0 + d.readMessages = 0 + d.writeMessages = 0 return err } @@ -301,6 +336,10 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() + if d.enableDiskLimitation { + // last 8 bytes are reserved for the number of messages in this file + d.maxBytesPerFileRead -= numFileMsgsBytes + } } } @@ -383,6 +422,8 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + // add all data to writeBuf before writing to file + // this causes everything to be written to file or nothing d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -394,6 +435,17 @@ func (d *diskQueue) writeOne(data []byte) error { return err } + totalBytes := int64(4 + dataLen) + + // check if we reached the file size limit with this message + if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile { + // write number of messages in binary to file + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) + if err != nil { + return err + } + } + // only write to the file once _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { @@ -402,17 +454,25 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 - if d.writePos >= d.maxBytesPerFile { + fileSize := d.writePos + + if d.enableDiskLimitation { + // save space for the number of messages in this file + fileSize += numFileMsgsBytes + d.writeMessages += 1 + } + + if fileSize >= d.maxBytesPerFile { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } d.writeFileNum++ d.writePos = 0 + d.writeMessages = 0 // sync every time we start writing to a new file err = d.sync() @@ -461,15 +521,23 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - var depth int64 - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &depth, - &d.readFileNum, &d.readPos, - &d.writeFileNum, &d.writePos) + // if user is using disk size limit feature + if d.enableDiskLimitation { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &d.depth, + &d.readFileNum, &d.readMessages, &d.readPos, + &d.writeFileNum, &d.writeMessages, &d.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &d.depth, + &d.readFileNum, &d.readPos, + &d.writeFileNum, &d.writePos) + } + if err != nil { return err } - d.depth = depth + d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos @@ -490,10 +558,18 @@ func (d *diskQueue) persistMetaData() error { return err } - _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", - d.depth, - d.readFileNum, d.readPos, - d.writeFileNum, d.writePos) + // if user is using disk size limit feature + if d.enableDiskLimitation { + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + d.depth, + d.readFileNum, d.readMessages, d.readPos, + d.writeFileNum, d.writeMessages, d.writePos) + } else { + _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", + d.depth, + d.readFileNum, d.readPos, + d.writeFileNum, d.writePos) + } if err != nil { f.Close() return err @@ -558,9 +634,12 @@ func (d *diskQueue) moveForward() { d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 + d.readMessages += 1 // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { + d.readMessages = 0 + // sync every time we start reading from a new file d.needSync = true @@ -585,6 +664,7 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 + d.writeMessages = 0 } badFn := d.fileName(d.readFileNum) @@ -605,6 +685,7 @@ func (d *diskQueue) handleReadError() { d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 + d.readMessages = 0 // significant state change, schedule a sync on the next iteration d.needSync = true diff --git a/diskqueue_test.go b/diskqueue_test.go index ba5879c..b4a712a 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -250,14 +250,16 @@ func TestDiskQueueCorruption(t *testing.T) { } type md struct { - depth int64 - readFileNum int64 - writeFileNum int64 - readPos int64 - writePos int64 + depth int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 + writeMessages int64 + readPos int64 + writePos int64 } -func readMetaDataFile(fileName string, retried int) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -265,17 +267,24 @@ func readMetaDataFile(fileName string, retried int) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &ret.depth, - &ret.readFileNum, &ret.readPos, - &ret.writeFileNum, &ret.writePos) + if enableDiskLimitation { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readMessages, &ret.readPos, + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readPos, + &ret.writeFileNum, &ret.writePos) + } if err != nil { panic(err) } @@ -297,7 +306,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { dq.Put(msg) for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -315,7 +324,7 @@ next: <-dq.ReadChan() for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -331,6 +340,170 @@ next: done: } +func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + dq.Put(msg) + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 0 && + d.writePos == 1004 && + d.readMessages == 0 && + d.writeMessages == 1 { + // success + goto next + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +next: + dq.Put(msg) + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readPos == 1004 && + d.writePos == 2008 && + d.readMessages == 1 && + d.writeMessages == 2 { + // success + goto completeWriteFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFile: + // meet the file size limit exactly (2048 bytes) when writeFileNum + // equals readFileNum + totalBytes := 2 * (msgSize + 4) + bytesRemaining := 2048 - (totalBytes + 8) + oneByteMsgSizeIncrease := 5 + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a new file is created + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 3 && + d.readFileNum == 0 && + d.writeFileNum == 1 && + d.readPos == 1004 && + d.writePos == 0 && + d.readMessages == 1 && + d.writeMessages == 0 { + // success + goto completeReadFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFile: + dq.Put(msg) + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 1 && + d.writeFileNum == 1 && + d.readPos == 0 && + d.writePos == 1004 && + d.readMessages == 0 && + d.writeMessages == 1 { + // success + goto completeWriteFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFileAgain: + // make writeFileNum ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + // meet the file size limit exactly (2048 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a file is completely read + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 7 && + d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success + goto completeReadFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFileAgain: + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 0 && + d.readFileNum == 3 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + func TestDiskQueueTorture(t *testing.T) { var wg sync.WaitGroup