diff --git a/diskqueue.go b/diskqueue.go index c5e6e3d..2e08a0e 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -7,9 +7,11 @@ import ( "errors" "fmt" "io" + "io/ioutil" "math/rand" "os" "path" + "regexp" "sync" "time" ) @@ -19,14 +21,17 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) - numFileMsgsBytes = 8 + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + numFileMsgBytes = 8 + maxMetaDataFileSize = 56 ) +var badFileNameRegexp, fileNameRegexp *regexp.Regexp + type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) func (l LogLevel) String() string { @@ -52,6 +57,7 @@ type Interface interface { Delete() error Depth() int64 Empty() error + TotalBytesFolderSize() int64 } // diskQueue implements a filesystem backed FIFO queue @@ -59,21 +65,21 @@ type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) - readPos int64 - writePos int64 - readFileNum int64 - writeFileNum int64 - readMessages int64 - writeMessages int64 - writeBytes int64 - depth int64 + readPos int64 + writePos int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 // Number of read messages. It's used to update depth. + writeMessages int64 // Number of write messages. It's used to update depth. + totalDiskSpaceUsed int64 + depth int64 sync.RWMutex // instantiation time metadata name string dataPath string - maxBytesDiskSize int64 + maxBytesDiskSpace int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -88,10 +94,6 @@ type diskQueue struct { nextReadPos int64 nextReadFileNum int64 - // keep track of the msg size we have read - // (but not yet sent over readChan) - readMsgSize int32 - readFile *os.File writeFile *os.File reader *bufio.Reader @@ -121,28 +123,27 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { - return NewWithDiskSize(name, dataPath, + return NewWithDiskSpace(name, dataPath, 0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } -// Another constructor that allows users to use Disk Size Limit feature -// If user is not using Disk Size Limit feature, maxBytesDiskSize will +// Another constructor that allows users to use Disk Space Limit feature +// If user is not using Disk Space Limit feature, maxBytesDiskSpace will // be 0 -func NewWithDiskSize(name string, dataPath string, - maxBytesDiskSize int64, maxBytesPerFile int64, +func NewWithDiskSpace(name string, dataPath string, + maxBytesDiskSpace int64, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { enableDiskLimitation := true - if maxBytesDiskSize <= 0 { - maxBytesDiskSize = 0 + if maxBytesDiskSpace <= 0 { enableDiskLimitation = false } d := diskQueue{ name: name, dataPath: dataPath, - maxBytesDiskSize: maxBytesDiskSize, + maxBytesDiskSpace: maxBytesDiskSpace, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, @@ -160,19 +161,39 @@ func NewWithDiskSize(name string, dataPath string, enableDiskLimitation: enableDiskLimitation, } - d.start() + err := d.start() + if err != nil { + return nil + } + return &d } // Get the last known state of DiskQueue from metadata and start ioLoop -func (d *diskQueue) start() { +func (d *diskQueue) start() error { + // ensure that DiskQueue has enough space to write the metadata file + if d.enableDiskLimitation && d.maxBytesDiskSpace <= maxMetaDataFileSize { + errorMsg := fmt.Sprintf( + "disk size limit too small(%d): not enough space for MetaData file size(%d)", + d.maxBytesDiskSpace, maxMetaDataFileSize) + d.logf(ERROR, "DISKQUEUE(%s) - %s", errorMsg) + return errors.New(errorMsg) + } + // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) } + fileNameRegexp = regexp.MustCompile(`^` + d.name + `.diskqueue.\d\d\d\d\d\d.dat$`) + badFileNameRegexp = regexp.MustCompile(`^` + d.name + `.diskqueue.\d\d\d\d\d\d.dat.bad$`) + + d.updateTotalDiskSpaceUsed() + go d.ioLoop() + + return nil } // Depth returns the depth of the queue @@ -185,6 +206,20 @@ func (d *diskQueue) Depth() int64 { return depth } +// Returns to total size of the contents (files) in the directory located in the dataPath +func (d *diskQueue) TotalBytesFolderSize() int64 { + var totalFolderSizeBytes int64 + + getTotalFolderSizeBytes := func(fileInfo os.FileInfo) error { + totalFolderSizeBytes += fileInfo.Size() + return nil + } + + d.walkDiskQueueDir(getTotalFolderSizeBytes) + + return totalFolderSizeBytes +} + // ReadChan returns the receive-only []byte channel for reading data func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan @@ -306,7 +341,7 @@ func (d *diskQueue) skipToNextRWFile() error { d.depth = 0 if d.enableDiskLimitation { - d.writeBytes = 0 + d.totalDiskSpaceUsed = 0 d.readMessages = 0 d.writeMessages = 0 } @@ -347,7 +382,7 @@ func (d *diskQueue) readOne() ([]byte, error) { d.maxBytesPerFileRead = stat.Size() if d.enableDiskLimitation { // last 8 bytes are reserved for the number of messages in this file - d.maxBytesPerFileRead -= numFileMsgsBytes + d.maxBytesPerFileRead -= numFileMsgBytes } } } @@ -370,8 +405,6 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, fmt.Errorf("invalid message read size (%d)", msgSize) } - d.readMsgSize = msgSize - readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { @@ -403,6 +436,203 @@ func (d *diskQueue) readOne() ([]byte, error) { return readBuf, nil } +func (d *diskQueue) removeBadFile(oldestBadFileInfo os.FileInfo) error { + var err error + badFileFilePath := path.Join(d.dataPath, oldestBadFileInfo.Name()) + + // remove file if it exists + err = os.Remove(badFileFilePath) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove .bad file(%s) - %s", d.name, oldestBadFileInfo.Name(), err) + d.updateTotalDiskSpaceUsed() + return err + } else { + // recaclulate total bad files disk size to get most accurate info + d.totalDiskSpaceUsed -= oldestBadFileInfo.Size() + d.logf(INFO, "DISKQUEUE(%s) removed .bad file(%s) of size(%d bytes) to free up disk space", d.name, oldestBadFileInfo.Name(), oldestBadFileInfo.Size()) + } + + return nil +} + +func (d *diskQueue) readNumOfMessages(fileName string) (int64, error) { + var err error + + if d.readFile == nil { + d.readFile, err = os.OpenFile(fileName, os.O_RDONLY, 0600) + if err != nil { + return 0, err + } + + d.reader = bufio.NewReader(d.readFile) + } + + closeReadFile := func() { + d.readFile.Close() + d.readFile = nil + } + defer closeReadFile() + + // read total messages number at the end of the file + _, err = d.readFile.Seek(-numFileMsgBytes, 2) + if err != nil { + return 0, err + } + + var totalMessages int64 + err = binary.Read(d.reader, binary.BigEndian, &totalMessages) + if err != nil { + return 0, err + } + + return totalMessages, nil +} + +func (d *diskQueue) removeReadFile() error { + if d.readFileNum == d.writeFileNum { + d.skipToNextRWFile() + return nil + } + + curFileName := d.fileName(d.readFileNum) + totalMessages, err := d.readNumOfMessages(curFileName) + if err != nil { + return err + } + + // update depth with the remaining number of messages + d.depth -= totalMessages - d.readMessages + + // we have not finished reading this file + if d.readFileNum == d.nextReadFileNum { + d.nextReadFileNum++ + d.nextReadPos = 0 + } + + d.moveToNextReadFile() + + return nil +} + +// walk through all of the files in the DiskQueue directory +func (d *diskQueue) walkDiskQueueDir(fn func(os.FileInfo) error) error { + fileInfos, err := ioutil.ReadDir(d.dataPath) + + if err != nil { + return err + } + + for _, fileInfo := range fileInfos { + // only go through files and skip directories + if !fileInfo.IsDir() { + err = fn(fileInfo) + if err != nil { + return err + } + } + } + + return nil +} + +func (d *diskQueue) getAllBadFileInfo() ([]os.FileInfo, error) { + var badFileInfos []os.FileInfo + + getAllBadFileInfo := func(fileInfo os.FileInfo) error { + // only accept "bad" files created by this DiskQueue object + if badFileNameRegexp.MatchString(fileInfo.Name()) { + badFileInfos = append(badFileInfos, fileInfo) + } + + return nil + } + + err := d.walkDiskQueueDir(getAllBadFileInfo) + + return badFileInfos, err +} + +// get the accurate total non-"bad" file size +func (d *diskQueue) updateTotalDiskSpaceUsed() { + d.totalDiskSpaceUsed = maxMetaDataFileSize + + updateTotalDiskSpaceUsed := func(fileInfo os.FileInfo) error { + // only accept files created by this DiskQueue object + if fileNameRegexp.MatchString(fileInfo.Name()) || badFileNameRegexp.MatchString(fileInfo.Name()) { + d.totalDiskSpaceUsed += fileInfo.Size() + } + + return nil + } + + err := d.walkDiskQueueDir(updateTotalDiskSpaceUsed) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to update write bytes - %s", d.name, err) + } +} + +func (d *diskQueue) freeDiskSpace(expectedBytesIncrease int64) error { + var err error + var badFileInfos []os.FileInfo + + badFileInfos, err = d.getAllBadFileInfo() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to retrieve all .bad file info - %s", d.name, err) + } + + if expectedBytesIncrease > d.maxBytesDiskSpace { + return fmt.Errorf("could not make space for expectedBytesIncrease = %d, with maxBytesDiskSpace = %d ", expectedBytesIncrease, d.maxBytesDiskSpace) + } + + // keep freeing up disk space until we have enough space to write this message + for _, badFileInfo := range badFileInfos { + if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace { + return nil + } + d.removeBadFile(badFileInfo) + } + for d.readFileNum <= d.writeFileNum { + if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace { + return nil + } + // delete the read file (make space) + readFileToDeleteNum := d.readFileNum + err = d.removeReadFile() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove file(%s) - %s", d.name, d.fileName(readFileToDeleteNum), err) + d.handleReadError() + return err + } else { + d.logf(INFO, "DISKQUEUE(%s) removed file(%s) to free up disk space", d.name, d.fileName(readFileToDeleteNum)) + } + d.updateTotalDiskSpaceUsed() + } + + if d.totalDiskSpaceUsed+expectedBytesIncrease > d.maxBytesDiskSpace { + return fmt.Errorf("could not make space for totalDiskSpaceUsed = %d, expectedBytesIncrease = %d, with maxBytesDiskSpace = %d ", d.totalDiskSpaceUsed, expectedBytesIncrease, d.maxBytesDiskSpace) + } + + return nil +} + +// check if there is enough available disk space to write new data to file +func (d *diskQueue) checkDiskSpace(expectedBytesIncrease int64) error { + // If the data to be written is bigger than the disk size limit, do not write + if expectedBytesIncrease > d.maxBytesDiskSpace { + errorMsg := fmt.Sprintf( + "message size(%d) surpasses disk size limit(%d)", + expectedBytesIncrease, d.maxBytesDiskSpace) + d.logf(ERROR, "DISKQUEUE(%s) - %s", d.name, errorMsg) + return errors.New(errorMsg) + } + + // check if we have enough space to write this message + if d.totalDiskSpaceUsed+expectedBytesIncrease > d.maxBytesDiskSpace { + return d.freeDiskSpace(expectedBytesIncrease) + } + return nil +} + // writeOne performs a low level filesystem write for a single []byte // while advancing write positions and rolling files, if necessary func (d *diskQueue) writeOne(data []byte) error { @@ -433,6 +663,26 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + totalBytes := int64(4 + dataLen) + reachedFileSizeLimit := false + + if d.enableDiskLimitation { + expectedBytesIncrease := totalBytes + // check if we will reach or surpass file size limit + if d.writePos+totalBytes+numFileMsgBytes >= d.maxBytesPerFile { + reachedFileSizeLimit = true + expectedBytesIncrease += numFileMsgBytes + } + + // free disk space if needed + err = d.checkDiskSpace(expectedBytesIncrease) + if err != nil { + return err + } + } else if d.writePos+totalBytes >= d.maxBytesPerFile { + reachedFileSizeLimit = true + } + // add all data to writeBuf before writing to file // this causes everything to be written to file or nothing d.writeBuf.Reset() @@ -446,10 +696,8 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) - // check if we reached the file size limit with this message - if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile { + if d.enableDiskLimitation && reachedFileSizeLimit { // write number of messages in binary to file err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) if err != nil { @@ -468,16 +716,12 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos += totalBytes d.depth += 1 - fileSize := d.writePos - if d.enableDiskLimitation { - // save space for the number of messages in this file - fileSize += numFileMsgsBytes - d.writeBytes += totalBytes + d.totalDiskSpaceUsed += totalBytes d.writeMessages += 1 } - if fileSize >= d.maxBytesPerFile { + if reachedFileSizeLimit { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -486,8 +730,6 @@ func (d *diskQueue) writeOne(data []byte) error { d.writePos = 0 if d.enableDiskLimitation { - // add bytes for the number of messages in the file - d.writeBytes += numFileMsgsBytes d.writeMessages = 0 } @@ -517,6 +759,10 @@ func (d *diskQueue) sync() error { } } + if d.enableDiskLimitation { + d.updateTotalDiskSpaceUsed() + } + err := d.persistMetaData() if err != nil { return err @@ -538,15 +784,16 @@ func (d *diskQueue) retrieveMetaData() error { } defer f.Close() - // if user is using disk size limit feature + var depth int64 + // if user is using disk space limit feature if d.enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", - &d.depth, + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &d.depth, + &depth, &d.readFileNum, &d.readPos, &d.writeFileNum, &d.writePos) } @@ -555,6 +802,7 @@ func (d *diskQueue) retrieveMetaData() error { return err } + d.depth = depth d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos @@ -575,12 +823,12 @@ func (d *diskQueue) persistMetaData() error { return err } - // if user is using disk size limit feature + // if user is using disk space limit feature if d.enableDiskLimitation { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) + d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -646,18 +894,10 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } } -func (d *diskQueue) moveForward() { - // add bytes for the number of messages and the size of the message - readFileSize := int64(d.readMsgSize) + d.readPos + numFileMsgsBytes + 4 - +func (d *diskQueue) moveToNextReadFile() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos - d.depth -= 1 - - if d.enableDiskLimitation { - d.readMessages += 1 - } // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { @@ -666,16 +906,32 @@ func (d *diskQueue) moveForward() { d.needSync = true fn := d.fileName(oldReadFileNum) + oldFileInfo, _ := os.Stat(fn) + err := os.Remove(fn) if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) + } else { + d.logf(INFO, "DISKQUEUE(%s) removed(%s) of size(%d bytes)", d.name, fn, oldFileInfo.Size()) } if d.enableDiskLimitation { d.readMessages = 0 - d.writeBytes -= readFileSize + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to update write bytes - %s", d.name, err) + } } } +} + +func (d *diskQueue) moveForward() { + d.depth -= 1 + + if d.enableDiskLimitation { + d.readMessages += 1 + } + + d.moveToNextReadFile() d.checkTailCorruption(d.depth) } @@ -691,7 +947,10 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 - d.writeMessages = 0 + + if d.enableDiskLimitation { + d.writeMessages = 0 + } } badFn := d.fileName(d.readFileNum) @@ -708,32 +967,18 @@ func (d *diskQueue) handleReadError() { d.name, badFn, badRenameFn) } - if d.enableDiskLimitation { - var badFileSize int64 - if d.readFileNum == d.writeFileNum { - badFileSize = d.writeBytes - } else { - var stat os.FileInfo - stat, err = os.Stat(badRenameFn) - if err == nil { - badFileSize = stat.Size() - } else { - // max file size - badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + numFileMsgsBytes - } - } - - d.writeBytes -= badFileSize - } - d.readFileNum++ d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 - d.readMessages = 0 + if d.enableDiskLimitation { + d.readMessages = 0 + } // significant state change, schedule a sync on the next iteration d.needSync = true + + d.checkTailCorruption(d.depth) } // ioLoop provides the backend for exposing a go channel (via ReadChan()) diff --git a/diskqueue_test.go b/diskqueue_test.go index 9e98eaa..4cd5239 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -9,6 +9,7 @@ import ( "path" "path/filepath" "reflect" + "regexp" "runtime" "strconv" "sync" @@ -231,6 +232,11 @@ func TestDiskQueueCorruption(t *testing.T) { Equal(t, msg, <-dq.ReadChan()) } + badFilesCount := numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + // corrupt the 4th (current) file dqFn = dq.(*diskQueue).fileName(3) os.Truncate(dqFn, 100) @@ -238,6 +244,10 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) // in 5th file Equal(t, msg, <-dq.ReadChan()) + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } // write a corrupt (len 0) message at the 5th (current) file dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0}) @@ -247,11 +257,14 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) Equal(t, msg, <-dq.ReadChan()) + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 3 { + panic("fail") + } } type md struct { depth int64 - writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,11 +288,12 @@ func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) m defer f.Close() var ret md + if enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, @@ -343,29 +357,33 @@ done: func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { l := NewTestLogger(t) - dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) + dqName := "test_disk_queue_read_with_disk_size_implementation" + strconv.Itoa(int(time.Now().Unix())) tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) if err != nil { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSize(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 msg := make([]byte, msgSize) dq.Put(msg) + if dq.Depth() != 1 { + panic("fail") + } + for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readMessages == 0 && d.writeMessages == 1 && d.readPos == 0 && - d.writePos == 1004 { + d.writePos == 1004 && + dq.(*diskQueue).totalDiskSpaceUsed == 1004+maxMetaDataFileSize { // success goto next } @@ -377,16 +395,20 @@ next: dq.Put(msg) <-dq.ReadChan() + if dq.Depth() != 1 { + panic("fail") + } + for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readMessages == 1 && d.writeMessages == 2 && d.readPos == 1004 && - d.writePos == 2008 { + d.writePos == 2008 && + dq.(*diskQueue).totalDiskSpaceUsed == 2008+maxMetaDataFileSize { // success goto completeWriteFile } @@ -403,18 +425,22 @@ completeWriteFile: dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) dq.Put(make([]byte, 1)) + if dq.Depth() != 3 { + panic("fail") + } + for i := 0; i < 10; i++ { // test that write position and messages reset when a new file is created // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && - d.writeBytes == 2048 && d.readFileNum == 0 && d.writeFileNum == 1 && d.readMessages == 1 && d.writeMessages == 0 && d.readPos == 1004 && - d.writePos == 0 { + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 2048+maxMetaDataFileSize { // success goto completeReadFile } @@ -429,18 +455,22 @@ completeReadFile: <-dq.ReadChan() <-dq.ReadChan() + if dq.Depth() != 1 { + panic("fail") + } + for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && - d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && d.readMessages == 0 && d.writeMessages == 1 && d.readPos == 0 && - d.writePos == 1004 { + d.writePos == 1004 && + dq.(*diskQueue).totalDiskSpaceUsed == 1004+maxMetaDataFileSize { // success goto completeWriteFileAgain } @@ -460,18 +490,22 @@ completeWriteFileAgain: dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) dq.Put(make([]byte, 1)) + if dq.Depth() != 7 { + panic("fail") + } + for i := 0; i < 10; i++ { // test that write position and messages reset when a file is completely read // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 7 && - d.writeBytes == 5068 && d.readFileNum == 1 && d.writeFileNum == 3 && d.readMessages == 0 && d.writeMessages == 0 && d.readPos == 0 && - d.writePos == 0 { + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 5068+maxMetaDataFileSize { // success goto completeReadFileAgain } @@ -489,18 +523,389 @@ completeReadFileAgain: <-dq.ReadChan() <-dq.ReadChan() + if dq.Depth() != 0 { + panic("fail") + } + for i := 0; i < 10; i++ { // test that read position and messages reset when a file is completely read // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 0 && - d.writeBytes == 0 && d.readFileNum == 3 && d.writeFileNum == 3 && d.readMessages == 0 && d.writeMessages == 0 && d.readPos == 0 && - d.writePos == 0 { + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == maxMetaDataFileSize { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskSizeImplementationDiskSizeLimit(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_disk_size_limit" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // meet disk size limit + // write a complete file + dq.Put(msg) + dq.Put(msg) + dq.Put(msg) + + // meet the disk size limit exactly (6040 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + totalDiskBytes := int64(5*(msgSize+4) + 8) + + // save space for msg len and number of msgs in file + diskBytesRemaining := 6040 - maxMetaDataFileSize - (totalDiskBytes + 12) + dq.Put(make([]byte, diskBytesRemaining)) + + depth := dq.Depth() + if depth != 6 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 6 && + d.readFileNum == 0 && + d.writeFileNum == 2 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 6040 { + // success + goto surpassDiskSizeLimit + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +surpassDiskSizeLimit: + dq.Put(make([]byte, 1)) + + if dq.Depth() != 4 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 4 && + d.readFileNum == 1 && + d.writeFileNum == 2 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 5 && + dq.(*diskQueue).totalDiskSpaceUsed == 3025 { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) { + // write three files + + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_msg_size_greater_than_file_size" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // file size: 1496 + dq.Put(msg) + dq.Put(make([]byte, 480)) + + // file size: 1032 + dq.Put(msg) + dq.Put(make([]byte, 16)) + + // file size: 1512 + dq.Put(make([]byte, 1500)) + + if dq.Depth() != 5 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 5 && + d.readFileNum == 0 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 4040+maxMetaDataFileSize { + // success + goto writeLargeMsg + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +writeLargeMsg: + // Write a large message that causes the deletion of three files + dq.Put(make([]byte, 3000)) + + if dq.Depth() != 1 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 3 && + d.writeFileNum == 4 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 3012+maxMetaDataFileSize { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func createBadFile(dqName string, filePath string, fileNum int64, numBytes int) error { + fn := fmt.Sprintf(path.Join(filePath, "%s.diskqueue.%06d.dat.bad"), dqName, fileNum) + + badFile, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + + defer badFile.Close() + + _, err = badFile.Write(make([]byte, numBytes)) + + return err +} + +func numberOfBadFiles(diskQueueName string, dataPath string) int64 { + var badFilesCount int64 + + fileInfos, _ := ioutil.ReadDir(dataPath) + for _, fileInfo := range fileInfos { + regExp, _ := regexp.Compile(`^` + diskQueueName + `.diskqueue.\d\d\d\d\d\d.dat.bad$`) + if regExp.MatchString(fileInfo.Name()) { + badFilesCount++ + } + } + + return badFilesCount +} + +func TestDiskSizeImplementationWithBadFiles(t *testing.T) { + // write three files + + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_with_bad_files" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + // there should be no .bad files + var badFilesCount int64 + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + // make 2 bad files + createBadFile(dqName, tmpDir, 0, 1503) + createBadFile(dqName, tmpDir, 1, 1032) + + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } + + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // file 0 size: 1497 + dq.Put(msg) + dq.Put(make([]byte, 481)) + + // no bad files should have been deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } + + // file 1 size: 1032 + dq.Put(msg) + dq.Put(make([]byte, 16)) + + // one .bad file should be deleted in order to make space + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + + // file 2 size: 1503 + dq.Put(make([]byte, 1491)) + + // check if all the .bad files were deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + depth := dq.Depth() + if depth != 5 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 5 && + d.readFileNum == 0 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 4032+maxMetaDataFileSize { + // success + goto corruptFiles + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +corruptFiles: + // test removeReadFile when file is corrupted + // create bad files see if totalDiskSpaceUsed is updated properly + // check that after corrupting files, we make space appropriately + + // corrupt file 0 + dqFn := dq.(*diskQueue).fileName(0) + os.Truncate(dqFn, 1017) // 1 valid message, 1 corrupted message + + dq.Put(make([]byte, 100)) + + // check if the .bad files were deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 104 && + dq.(*diskQueue).totalDiskSpaceUsed == 2639+maxMetaDataFileSize { + // success + goto readCorruptedFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +readCorruptedFile: + // test handleReadError + + // there should be no "bad" files at this point + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + // corrupt file 2 + // have readOne turn it into a bad file and then try to make space + dqFn = dq.(*diskQueue).fileName(2) + os.Truncate(dqFn, 1020) + + // read file 1 + <-dq.ReadChan() + <-dq.ReadChan() + + // wait for DiskQueue to notice that file 2 is corrupted + time.Sleep(20 * time.Millisecond) + + // check if the file was converted into a .bad file + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + + // go over the disk limit + dq.Put(msg) + + // write a complete file + dq.Put(msg) + dq.Put(msg) + + // check if the corrupted file was deleted to make space + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.readFileNum == 3 && + d.writeFileNum == 5 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 3132+maxMetaDataFileSize { // success goto done }