diff --git a/diskqueue.go b/diskqueue.go index 1058c47..3aab8e5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -425,6 +425,8 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + // add all data to writeBuf before writing to file + // this causes everything to be written to file or nothing d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -441,7 +443,7 @@ func (d *diskQueue) writeOne(data []byte) error { // check if we reached the file size limit with this message if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile { // write number of messages in binary to file - err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages) + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { return err } diff --git a/diskqueue_test.go b/diskqueue_test.go index a976bb1..8d4f78f 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -260,7 +260,7 @@ type md struct { writePos int64 } -func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { +func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -268,14 +268,15 @@ func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried, withDiskSpaceFeat) + return readMetaDataFile(fileName, retried, diskLimitFeatIsOn) } panic(err) } defer f.Close() var ret md - if withDiskSpaceFeat { + + if diskLimitFeatIsOn { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, @@ -352,7 +353,8 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() - msg := make([]byte, 1000) + msgSize := 1000 + msg := make([]byte, msgSize) dq.Put(msg) for i := 0; i < 10; i++ { @@ -394,12 +396,19 @@ next: panic("fail") completeWriteFile: - dq.Put(msg) + // meet the file size limit exactly (2048 bytes) when writeFileNum + // equals readFileNum + totalBytes := 2 * (msgSize + 4) + bytesRemaining := 2048 - (totalBytes + 8) + oneByteMsgSizeIncrease := 5 + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created + // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) - if d.depth == 2 && + if d.depth == 3 && d.writeBytes == 3020 && d.readFileNum == 0 && d.writeFileNum == 1 && @@ -417,11 +426,13 @@ completeWriteFile: completeReadFile: dq.Put(msg) + <-dq.ReadChan() <-dq.ReadChan() <-dq.ReadChan() for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) t.Logf("Write bytes: %d", d.writeBytes) if d.depth == 1 && @@ -433,6 +444,64 @@ completeReadFile: d.readMessages == 0 && d.writeMessages == 1 { // success + goto completeWriteFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFileAgain: + // make writeFileNum ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + // meet the file size limit exactly (2048 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a file is completely read + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 7 && + d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success + goto completeReadFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFileAgain: + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 0 && + d.readFileNum == 3 && + d.writeFileNum == 3 && + d.readPos == 0 && + d.writePos == 0 && + d.readMessages == 0 && + d.writeMessages == 0 { + // success goto done } time.Sleep(100 * time.Millisecond)