Skip to content
4 changes: 3 additions & 1 deletion diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ func (d *diskQueue) writeOne(data []byte) error {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

// add all data to writeBuf before writing to file
// this causes everything to be written to file or nothing
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
Expand All @@ -441,7 +443,7 @@ func (d *diskQueue) writeOne(data []byte) error {
// check if we reached the file size limit with this message
if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile {
// write number of messages in binary to file
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages)
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1)
if err != nil {
return err
}
Expand Down
81 changes: 75 additions & 6 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,22 +260,23 @@ type md struct {
writePos int64
}

func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md {
func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
// provide a simple retry that results in up to
// another 500ms for the file to be written.
if retried < 9 {
retried++
time.Sleep(50 * time.Millisecond)
return readMetaDataFile(fileName, retried, withDiskSpaceFeat)
return readMetaDataFile(fileName, retried, diskLimitFeatIsOn)
}
panic(err)
}
defer f.Close()

var ret md
if withDiskSpaceFeat {

if diskLimitFeatIsOn {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
&ret.depth,
&ret.readFileNum, &ret.readMessages, &ret.readPos,
Expand Down Expand Up @@ -352,7 +353,8 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, 1000)
msgSize := 1000
msg := make([]byte, msgSize)
dq.Put(msg)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -394,12 +396,19 @@ next:
panic("fail")

completeWriteFile:
dq.Put(msg)
// meet the file size limit exactly (2048 bytes) when writeFileNum
// equals readFileNum
totalBytes := 2 * (msgSize + 4)
bytesRemaining := 2048 - (totalBytes + 8)
oneByteMsgSizeIncrease := 5
dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease))
dq.Put(make([]byte, 1))

for i := 0; i < 10; i++ {
// test that write position and messages reset when a new file is created
// test the writeFileNum correctly increments
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
if d.depth == 2 &&
if d.depth == 3 &&
d.writeBytes == 3020 &&
d.readFileNum == 0 &&
d.writeFileNum == 1 &&
Expand All @@ -417,11 +426,13 @@ completeWriteFile:
completeReadFile:
dq.Put(msg)

<-dq.ReadChan()
<-dq.ReadChan()
<-dq.ReadChan()

for i := 0; i < 10; i++ {
// test that read position and messages reset when a file is completely read
// test the readFileNum correctly increments
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
t.Logf("Write bytes: %d", d.writeBytes)
if d.depth == 1 &&
Expand All @@ -433,6 +444,64 @@ completeReadFile:
d.readMessages == 0 &&
d.writeMessages == 1 {
// success
goto completeWriteFileAgain
}
time.Sleep(100 * time.Millisecond)
}
panic("fail")

completeWriteFileAgain:
// make writeFileNum ahead of readFileNum
dq.Put(msg)
dq.Put(msg)

// meet the file size limit exactly (2048 bytes) when writeFileNum
// is ahead of readFileNum
dq.Put(msg)
dq.Put(msg)
dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease))
dq.Put(make([]byte, 1))

for i := 0; i < 10; i++ {
// test that write position and messages reset when a file is completely read
// test the writeFileNum correctly increments
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
if d.depth == 7 &&
d.readFileNum == 1 &&
d.writeFileNum == 3 &&
d.readPos == 0 &&
d.writePos == 0 &&
d.readMessages == 0 &&
d.writeMessages == 0 {
// success
goto completeReadFileAgain
}
time.Sleep(100 * time.Millisecond)
}
panic("fail")

completeReadFileAgain:
<-dq.ReadChan()
<-dq.ReadChan()
<-dq.ReadChan()

<-dq.ReadChan()
<-dq.ReadChan()
<-dq.ReadChan()
<-dq.ReadChan()

for i := 0; i < 10; i++ {
// test that read position and messages reset when a file is completely read
// test the readFileNum correctly increments
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
if d.depth == 0 &&
d.readFileNum == 3 &&
d.writeFileNum == 3 &&
d.readPos == 0 &&
d.writePos == 0 &&
d.readMessages == 0 &&
d.writeMessages == 0 {
// success
goto done
}
time.Sleep(100 * time.Millisecond)
Expand Down