Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 59 additions & 68 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
type LogLevel int

const (
DEBUG = LogLevel(1)
INFO = LogLevel(2)
WARN = LogLevel(3)
ERROR = LogLevel(4)
FATAL = LogLevel(5)
DEBUG = LogLevel(1)
INFO = LogLevel(2)
WARN = LogLevel(3)
ERROR = LogLevel(4)
FATAL = LogLevel(5)
numFileMsgsBytes = 8
)

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})
Expand Down Expand Up @@ -64,15 +65,14 @@ type diskQueue struct {
writeFileNum int64
readMessages int64
writeMessages int64
writeBytes int64
depth int64

sync.RWMutex

// instantiation time metadata
name string
dataPath string
maxBytesDiskSpace int64
maxBytesDiskSize int64
maxBytesPerFile int64 // cannot change once created
maxBytesPerFileRead int64
minMsgSize int32
Expand Down Expand Up @@ -111,7 +111,7 @@ type diskQueue struct {
logf AppLogFunc

// disk limit implementation flag
diskLimitFeatIsOn bool
enableDiskLimitation bool
}

// New instantiates an instance of diskQueue, retrieving metadata
Expand All @@ -120,43 +120,43 @@ func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {

return NewWithDiskSpace(name, dataPath,
return NewWithDiskSize(name, dataPath,
0, maxBytesPerFile,
minMsgSize, maxMsgSize,
syncEvery, syncTimeout, logf)
}

// Another constructor that allows users to use Disk Space Limit feature
// If user is not using Disk Space Limit feature, maxBytesDiskSpace will
// Another constructor that allows users to use Disk Size Limit feature
// If user is not using Disk Size Limit feature, maxBytesDiskSize will
// be 0
func NewWithDiskSpace(name string, dataPath string,
maxBytesDiskSpace int64, maxBytesPerFile int64,
func NewWithDiskSize(name string, dataPath string,
maxBytesDiskSize int64, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
diskLimitFeatIsOn := true
if maxBytesDiskSpace <= 0 {
maxBytesDiskSpace = 0
diskLimitFeatIsOn = false
enableDiskLimitation := true
if maxBytesDiskSize <= 0 {
maxBytesDiskSize = 0
enableDiskLimitation = false
}
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesDiskSpace: maxBytesDiskSpace,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
diskLimitFeatIsOn: diskLimitFeatIsOn,
name: name,
dataPath: dataPath,
maxBytesDiskSize: maxBytesDiskSize,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
enableDiskLimitation: enableDiskLimitation,
}

d.start()
Expand Down Expand Up @@ -304,7 +304,7 @@ func (d *diskQueue) skipToNextRWFile() error {
d.nextReadPos = 0
d.depth = 0

if d.diskLimitFeatIsOn {
if d.enableDiskLimitation {
d.writeBytes = 0
d.readMessages = 0
d.writeMessages = 0
Expand All @@ -317,6 +317,7 @@ func (d *diskQueue) skipToNextRWFile() error {
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32

if d.readFile == nil {
curFileName := d.fileName(d.readFileNum)
Expand All @@ -343,40 +344,40 @@ func (d *diskQueue) readOne() ([]byte, error) {
stat, err := d.readFile.Stat()
if err == nil {
d.maxBytesPerFileRead = stat.Size()
if d.diskLimitFeatIsOn {
if d.enableDiskLimitation {
// last 8 bytes are reserved for the number of messages in this file
d.maxBytesPerFileRead -= 8
d.maxBytesPerFileRead -= numFileMsgsBytes
}
}
}

d.reader = bufio.NewReader(d.readFile)
}

err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize)
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}

if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize {
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize)
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}

readBuf := make([]byte, d.readMsgSize)
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}

totalBytes := int64(4 + d.readMsgSize)
totalBytes := int64(4 + msgSize)

// we only advance next* because we have not yet sent this to consumers
// (where readFileNum, readPos will actually be advanced)
Expand Down Expand Up @@ -445,7 +446,7 @@ func (d *diskQueue) writeOne(data []byte) error {
totalBytes := int64(4 + dataLen)

// check if we reached the file size limit with this message
if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile {
if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile {
// write number of messages in binary to file
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1)
if err != nil {
Expand All @@ -466,10 +467,9 @@ func (d *diskQueue) writeOne(data []byte) error {

fileSize := d.writePos

if d.diskLimitFeatIsOn {
if d.enableDiskLimitation {
// save space for the number of messages in this file
fileSize += 8
d.writeBytes += totalBytes
fileSize += numFileMsgsBytes
d.writeMessages += 1
}

Expand All @@ -480,12 +480,7 @@ func (d *diskQueue) writeOne(data []byte) error {

d.writeFileNum++
d.writePos = 0

if d.diskLimitFeatIsOn {
// add bytes for the number of messages in the file
d.writeBytes += 8
d.writeMessages = 0
}
d.writeMessages = 0

// sync every time we start writing to a new file
err = d.sync()
Expand Down Expand Up @@ -534,12 +529,12 @@ func (d *diskQueue) retrieveMetaData() error {
}
defer f.Close()

// if user is using disk space limit feature
if d.diskLimitFeatIsOn {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
// if user is using disk size limit feature
if d.enableDiskLimitation {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
&d.depth,
&d.readFileNum, &d.readMessages, &d.readPos,
&d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos)
&d.writeFileNum, &d.writeMessages, &d.writePos)
} else {
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&d.depth,
Expand Down Expand Up @@ -571,12 +566,12 @@ func (d *diskQueue) persistMetaData() error {
return err
}

// if user is using disk space limit feature
if d.diskLimitFeatIsOn {
_, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
// if user is using disk size limit feature
if d.enableDiskLimitation {
_, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
d.depth,
d.readFileNum, d.readMessages, d.readPos,
d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos)
d.writeFileNum, d.writeMessages, d.writePos)
} else {
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
d.depth,
Expand Down Expand Up @@ -643,8 +638,6 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
}

func (d *diskQueue) moveForward() {
// add bytes for the number of messages and the size of the message
readFileLen := int64(d.readMsgSize) + d.readPos + 12
oldReadFileNum := d.readFileNum
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
Expand All @@ -666,7 +659,7 @@ func (d *diskQueue) moveForward() {
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
}

if d.diskLimitFeatIsOn {
if d.enableDiskLimitation {
d.readMessages = 0
d.writeBytes -= readFileLen
}
Expand All @@ -687,7 +680,7 @@ func (d *diskQueue) handleReadError() {
d.writeFileNum++
d.writePos = 0

if d.diskLimitFeatIsOn {
if d.enableDiskLimitation {
d.writeMessages = 0
d.writeBytes = 0
}
Expand All @@ -711,9 +704,7 @@ func (d *diskQueue) handleReadError() {
d.readPos = 0
d.nextReadFileNum = d.readFileNum
d.nextReadPos = 0
if d.diskLimitFeatIsOn {
d.readMessages = 0
}
d.readMessages = 0

// significant state change, schedule a sync on the next iteration
d.needSync = true
Expand Down
18 changes: 6 additions & 12 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func TestDiskQueueCorruption(t *testing.T) {

type md struct {
depth int64
writeBytes int64
readFileNum int64
writeFileNum int64
readMessages int64
Expand All @@ -260,27 +259,26 @@ type md struct {
writePos int64
}

func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md {
func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
// provide a simple retry that results in up to
// another 500ms for the file to be written.
if retried < 9 {
retried++
time.Sleep(50 * time.Millisecond)
return readMetaDataFile(fileName, retried, diskLimitFeatIsOn)
return readMetaDataFile(fileName, retried, enableDiskLimitation)
}
panic(err)
}
defer f.Close()

var ret md

if diskLimitFeatIsOn {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
if enableDiskLimitation {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
&ret.depth,
&ret.readFileNum, &ret.readMessages, &ret.readPos,
&ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos)
&ret.writeFileNum, &ret.writeMessages, &ret.writePos)
} else {
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&ret.depth,
Expand Down Expand Up @@ -350,7 +348,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msgSize := 1000
Expand All @@ -360,7 +358,6 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
for i := 0; i < 10; i++ {
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
if d.depth == 1 &&
d.writeBytes == 1004 &&
d.readFileNum == 0 &&
d.writeFileNum == 0 &&
d.readMessages == 0 &&
Expand All @@ -381,7 +378,6 @@ next:
for i := 0; i < 10; i++ {
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
if d.depth == 1 &&
d.writeBytes == 2008 &&
d.readFileNum == 0 &&
d.writeFileNum == 0 &&
d.readMessages == 1 &&
Expand Down Expand Up @@ -434,9 +430,7 @@ completeReadFile:
// test that read position and messages reset when a file is completely read
// test the readFileNum correctly increments
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
t.Logf("Write bytes: %d", d.writeBytes)
if d.depth == 1 &&
d.writeBytes == 1004 &&
d.readFileNum == 1 &&
d.writeFileNum == 1 &&
d.readMessages == 0 &&
Expand Down