diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..52b4299 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,157 @@ +package cache + +import ( + "errors" + "fmt" + "os" + "sync" + "time" +) + +var ( + defaultLoLogFileMetaCacheCapacity = 10000 + zeroTime = time.Time{} +) + +// InvalidLogFile indicates the log file format is invalid. +var InvalidLogFile = errors.New("invalid format of log file") + +type LogFileMetaCache struct { + mu sync.RWMutex + cache map[string]*LogFileMeta + capacity int +} + +func NewLogFileMetaCache() *LogFileMetaCache { + return &LogFileMetaCache{ + cache: make(map[string]*LogFileMeta), + capacity: defaultLoLogFileMetaCacheCapacity, + } +} + +func NewLogFileMetaCacheWithCap(cap int) *LogFileMetaCache { + return &LogFileMetaCache{ + cache: make(map[string]*LogFileMeta), + capacity: cap, + } +} + +func (c *LogFileMetaCache) GetFileMata(stat os.FileInfo) *LogFileMeta { + if stat == nil { + return nil + } + c.mu.RLock() + m := c.cache[stat.Name()] + c.mu.RUnlock() + return m +} + +func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMeta) { + if stat == nil || meta == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + name := stat.Name() + _, ok := c.cache[name] + if ok { + c.cache[name] = meta + } else { + // TODO: Use LRU ? + if len(c.cache) < c.capacity { + c.cache[name] = meta + } + } +} + +func (c *LogFileMetaCache) Len() int { + c.mu.RLock() + l := len(c.cache) + c.mu.RUnlock() + return l +} + +type LogFileMeta struct { + inValid bool + mu sync.Mutex + ModTime time.Time + startTime time.Time + endTime time.Time +} + +func NewLogFileMeta(info os.FileInfo) *LogFileMeta { + return &LogFileMeta{ + ModTime: info.ModTime(), + } +} + +func (l *LogFileMeta) GetStartTime(stat os.FileInfo, getStartTime func() (time.Time, error)) (time.Time, error) { + if stat == nil { + return zeroTime, fmt.Errorf("file stat can't be nil") + } + l.mu.Lock() + defer l.mu.Unlock() + t := l.startTime + if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { + return t, nil + } + if getStartTime == nil { + return t, fmt.Errorf("can't get file '%v' start time", stat.Name()) + } + t, err := getStartTime() + if err != nil { + if err == InvalidLogFile { + l.inValid = true + } + return t, err + } + l.inValid = false + l.ModTime = stat.ModTime() + l.startTime = t + return t, nil +} + +func (l *LogFileMeta) GetEndTime(stat os.FileInfo, getEndTime func() (time.Time, error)) (time.Time, error) { + if stat == nil { + return zeroTime, fmt.Errorf("file stat can't be nil") + } + l.mu.Lock() + defer l.mu.Unlock() + t := l.endTime + if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { + return t, nil + } + if getEndTime == nil { + return t, fmt.Errorf("can't get file '%v' end time", stat.Name()) + } + t, err := getEndTime() + if err != nil { + if err == InvalidLogFile { + l.inValid = true + } + return t, err + } + l.inValid = false + l.ModTime = stat.ModTime() + l.endTime = t + return t, nil +} + +// IsInValid returns true if the file is invalid. +func (l *LogFileMeta) IsInValid() bool { + l.mu.Lock() + invalid := l.inValid + l.mu.Unlock() + return invalid +} + +// CheckLogTimeValid returns true if t != zeroTime. +func (l *LogFileMeta) CheckLogTimeValid(t time.Time) bool { + return !t.Equal(zeroTime) +} + +// CheckFileNotModified returns true if the file hasn't been modified. +func (l *LogFileMeta) CheckFileNotModified(info os.FileInfo) bool { + return l.ModTime.Equal(info.ModTime()) +} diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 0000000..26dd47c --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,277 @@ +package cache_test + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/sysutil/cache" +) + +var _ = SerialSuites(&testCacheSuite{}) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testCacheSuite struct { + tmpDir string +} + +func (s *testCacheSuite) SetUpSuite(c *C) { + tmpDir, err := ioutil.TempDir("", "cache") + c.Assert(err, IsNil) + s.tmpDir = tmpDir +} + +func (s *testCacheSuite) TearDownSuite(c *C) { + c.Assert(os.RemoveAll(s.tmpDir), IsNil, Commentf("remote tmpDir %v failed", s.tmpDir)) +} + +func (s *testCacheSuite) prepareFile(c *C, fileName string) (*os.File, os.FileInfo) { + filePath := path.Join(s.tmpDir, fileName) + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + c.Assert(err, IsNil) + stat, err := file.Stat() + c.Assert(err, IsNil) + return file, stat +} + +func (s *testCacheSuite) writeAndReopen(c *C, file *os.File, data string) (*os.File, os.FileInfo) { + // mock delay. + time.Sleep(time.Millisecond) + _, err := file.WriteString(data) + c.Assert(err, IsNil) + stat, err := file.Stat() + c.Assert(err, IsNil) + name := stat.Name() + err = file.Close() + c.Assert(err, IsNil) + return s.prepareFile(c, name) +} + +func (s *testCacheSuite) TestLogFileMetaGetStartTime(c *C) { + fileName := "tidb.log" + file, stat := s.prepareFile(c, fileName) + defer file.Close() + m := cache.NewLogFileMeta(stat) + c.Assert(m.ModTime, Equals, stat.ModTime()) + + // Test GetStartTime meet error + _, err := m.GetStartTime(stat, nil) + c.Assert(err.Error(), Equals, "can't get file 'tidb.log' start time") + + _, err = m.GetStartTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("get start time meet error") + }) + c.Assert(err.Error(), Equals, "get start time meet error") + + // Test GetStartTime + start := time.Now() + fileStart, err := m.GetStartTime(stat, func() (time.Time, error) { + return start, nil + }) + c.Assert(err, IsNil) + c.Assert(fileStart.Equal(start), IsTrue) + + // Test GetStartTime from cache + fileStart, err = m.GetStartTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("should get from cache") + }) + c.Assert(err, IsNil) + c.Assert(fileStart.Equal(start), IsTrue) + + // Test GetStartTime from cache + fileStart, err = m.GetStartTime(stat, nil) + c.Assert(err, IsNil) + c.Assert(fileStart.Equal(start), IsTrue) + // Test GetStartTime with nil stat + _, err = m.GetStartTime(nil, nil) + c.Assert(err.Error(), Equals, "file stat can't be nil") + + // Test file has been modified. + file, stat = s.writeAndReopen(c, file, "[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] [\"Welcome to TiDB.\"]") + + // Test GetStartTime meet invalid error + _, err = m.GetStartTime(stat, func() (time.Time, error) { + return time.Now(), cache.InvalidLogFile + }) + c.Assert(err, Equals, cache.InvalidLogFile) + c.Assert(m.IsInValid(), IsTrue) + + // Test GetStartTime meet error + _, err = m.GetStartTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("get start time meet error") + }) + c.Assert(err.Error(), Equals, "get start time meet error") + c.Assert(m.IsInValid(), IsTrue) + + newStartTime := time.Now() + fileStart, err = m.GetStartTime(stat, func() (time.Time, error) { + return newStartTime, nil + }) + c.Assert(err, IsNil) + c.Assert(fileStart.Equal(newStartTime), IsTrue) + c.Assert(m.IsInValid(), IsFalse) + + // Test GetStartTime from cache after file changed + fileStart, err = m.GetStartTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("should get from cache") + }) + c.Assert(err, IsNil) + c.Assert(fileStart.Equal(newStartTime), IsTrue) +} + +func (s *testCacheSuite) TestLogFileMetaGetEndTime(c *C) { + fileName := "tidb.log" + file, stat := s.prepareFile(c, fileName) + defer file.Close() + m := cache.NewLogFileMeta(stat) + c.Assert(m.ModTime, Equals, stat.ModTime()) + + // Test GetEndTime meet error + _, err := m.GetEndTime(stat, nil) + c.Assert(err.Error(), Equals, "can't get file 'tidb.log' end time") + + _, err = m.GetEndTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("get end time meet error") + }) + c.Assert(err.Error(), Equals, "get end time meet error") + + // Test GetEndTime + end := time.Now() + fileEnd, err := m.GetEndTime(stat, func() (time.Time, error) { + return end, nil + }) + c.Assert(err, IsNil) + c.Assert(fileEnd.Equal(end), IsTrue) + c.Assert(m.IsInValid(), IsFalse) + + // Test GetEndTime from cache + fileEnd, err = m.GetEndTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("should get from cache") + }) + c.Assert(err, IsNil) + c.Assert(fileEnd.Equal(end), IsTrue) + + // Test GetEndTime from cache + fileEnd, err = m.GetEndTime(stat, nil) + c.Assert(err, IsNil) + c.Assert(fileEnd.Equal(end), IsTrue) + // Test GetEndTime with nil stat + _, err = m.GetEndTime(nil, nil) + c.Assert(err.Error(), Equals, "file stat can't be nil") + + // Test file has been modified. + file, stat = s.writeAndReopen(c, file, "[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] [\"Welcome to TiDB.\"]") + + // Test GetEndTime meet invalid error + _, err = m.GetEndTime(stat, func() (time.Time, error) { + return time.Now(), cache.InvalidLogFile + }) + c.Assert(err, Equals, cache.InvalidLogFile) + c.Assert(m.IsInValid(), IsTrue) + + // Test GetEndTime meet error + _, err = m.GetEndTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("get end time meet error") + }) + c.Assert(err.Error(), Equals, "get end time meet error") + + // Test GetEndTime success + newEndTime := time.Now() + fileEnd, err = m.GetEndTime(stat, func() (time.Time, error) { + return newEndTime, nil + }) + c.Assert(err, IsNil) + c.Assert(fileEnd.Equal(newEndTime), IsTrue) + c.Assert(m.IsInValid(), IsFalse) + + // Test GetEndTime from cache after file changed + fileEnd, err = m.GetEndTime(stat, func() (time.Time, error) { + return time.Now(), fmt.Errorf("should get from cache") + }) + c.Assert(err, IsNil) + c.Assert(fileEnd.Equal(newEndTime), IsTrue) + c.Assert(m.IsInValid(), IsFalse) +} + +func (s *testCacheSuite) TestLogFileMetaCache(c *C) { + ca := cache.NewLogFileMetaCache() + ca.AddFileMataToCache(nil, nil) + c.Assert(ca.Len(), Equals, 0) + fileName := "tidb.log" + file, stat := s.prepareFile(c, fileName) + defer file.Close() + + m := cache.NewLogFileMeta(stat) + ca.AddFileMataToCache(stat, m) + c.Assert(ca.Len(), Equals, 1) + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.IsInValid(), IsFalse) + c.Assert(m.CheckFileNotModified(stat), IsTrue) + + m2 := cache.NewLogFileMeta(stat) + ca.AddFileMataToCache(stat, m2) + c.Assert(ca.Len(), Equals, 1) + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.IsInValid(), IsFalse) + c.Assert(m.CheckFileNotModified(stat), IsTrue) + + ca.AddFileMataToCache(nil, m) + c.Assert(ca.Len(), Equals, 1) + ca.AddFileMataToCache(stat, nil) + c.Assert(ca.Len(), Equals, 1) + + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.IsInValid(), IsFalse) + c.Assert(m.CheckFileNotModified(stat), IsTrue) + + m = ca.GetFileMata(nil) + c.Assert(m, IsNil) +} + +func (s *testCacheSuite) TestLogFileMetaCacheWithCap(c *C) { + ca := cache.NewLogFileMetaCacheWithCap(1) + ca.AddFileMataToCache(nil, nil) + c.Assert(ca.Len(), Equals, 0) + fileName := "tidb.log" + file, stat := s.prepareFile(c, fileName) + defer file.Close() + fileName2 := "tidb2.log" + file2, stat2 := s.prepareFile(c, fileName2) + defer file2.Close() + + m := cache.NewLogFileMeta(stat) + ca.AddFileMataToCache(stat, m) + c.Assert(ca.Len(), Equals, 1) + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.IsInValid(), IsFalse) + c.Assert(m.CheckFileNotModified(stat), IsTrue) + + m2 := cache.NewLogFileMeta(stat2) + ca.AddFileMataToCache(stat2, m2) + c.Assert(ca.Len(), Equals, 1) + m = ca.GetFileMata(stat2) + c.Assert(m, IsNil) + + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.IsInValid(), IsFalse) + c.Assert(m.CheckFileNotModified(stat), IsTrue) + + fmt.Printf("old mod time: \n%v\n%v -%v--\n", stat.ModTime(), m.ModTime, stat.Size()) + file, stat = s.writeAndReopen(c, file, "[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] [\"Welcome to TiDB.\"]") + fmt.Printf("new mod time: \n%v\n%v -%v--\n", stat.ModTime(), m.ModTime, stat.Size()) + m = ca.GetFileMata(stat) + c.Assert(m, NotNil) + c.Assert(m.CheckFileNotModified(stat), IsFalse) +} diff --git a/search_log.go b/search_log.go index 15f2b16..32117d0 100644 --- a/search_log.go +++ b/search_log.go @@ -28,6 +28,7 @@ import ( "time" pb "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/sysutil/cache" ) type logFile struct { @@ -43,10 +44,14 @@ func (l *logFile) EndTime() int64 { return l.end } -func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime int64) ([]logFile, error) { +func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime int64, ca *cache.LogFileMetaCache) ([]logFile, error) { if logFilePath == "" { return nil, errors.New("empty log file location configuration") } + if ca == nil { + // For test. + ca = cache.NewLogFileMetaCache() + } var logFiles []logFile var skipFiles []*os.File @@ -78,15 +83,41 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in if err != nil { return nil } - reader := bufio.NewReader(file) - firstItem, err := readFirstValidLog(ctx, reader, 10) + stat, err := file.Stat() + if err != nil { + return nil + } + meta := ca.GetFileMata(stat) + if meta == nil { + meta = cache.NewLogFileMeta(stat) + defer ca.AddFileMataToCache(stat, meta) + } else { + if meta.CheckFileNotModified(stat) && meta.IsInValid() { + return nil + } + } + + firstTime, err := meta.GetStartTime(stat, func() (time.Time, error) { + reader := bufio.NewReader(file) + firstItem, err := readFirstValidLog(ctx, reader, 10) + if err != nil { + return time.Time{}, err + } + return convertToGoTime(firstItem.Time), nil + }) if err != nil { skipFiles = append(skipFiles, file) return nil } - lastItem, err := readLastValidLog(ctx, file, 10) + lastTime, err := meta.GetEndTime(stat, func() (time.Time, error) { + lastItem, err := readLastValidLog(ctx, file, 10) + if err != nil { + return time.Time{}, err + } + return convertToGoTime(lastItem.Time), nil + }) if err != nil { skipFiles = append(skipFiles, file) return nil @@ -98,13 +129,15 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in return nil } - if beginTime > lastItem.Time || endTime < firstItem.Time { + fileStartTime := convertToLogTime(firstTime) + fileEndTime := convertToLogTime(lastTime) + if beginTime > fileEndTime || endTime < fileStartTime { skipFiles = append(skipFiles, file) } else { logFiles = append(logFiles, logFile{ file: file, - begin: firstItem.Time, - end: lastItem.Time, + begin: fileStartTime, + end: fileEndTime, }) } return nil @@ -143,6 +176,9 @@ func readFirstValidLog(ctx context.Context, reader *bufio.Reader, tryLines int64 for { line, err := readLine(reader) if err != nil { + if err == io.EOF { + return nil, cache.InvalidLogFile + } return nil, err } item, err := parseLogItem(line) @@ -157,7 +193,7 @@ func readFirstValidLog(ctx context.Context, reader *bufio.Reader, tryLines int64 return nil, ctx.Err() } } - return nil, errors.New("not a valid log file") + return nil, cache.InvalidLogFile } func readLastValidLog(ctx context.Context, file *os.File, tryLines int) (*pb.LogMessage, error) { @@ -185,7 +221,7 @@ func readLastValidLog(ctx context.Context, file *os.File, tryLines int) (*pb.Log break } } - return nil, errors.New("not a valid log file") + return nil, cache.InvalidLogFile } // Read a line from a reader. @@ -327,7 +363,15 @@ func parseTimeStamp(s string) (int64, error) { if err != nil { return 0, err } - return t.UnixNano() / int64(time.Millisecond), nil + return convertToLogTime(t), nil +} + +func convertToLogTime(t time.Time) int64 { + return t.UnixNano() / int64(time.Millisecond) +} + +func convertToGoTime(ms int64) time.Time { + return time.Unix(0, ms*int64(time.Millisecond)) } // logIterator implements Iterator and IteratorWithPeek interface. diff --git a/search_log_test.go b/search_log_test.go index 237fd33..b5ffdd0 100644 --- a/search_log_test.go +++ b/search_log_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" pb "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/sysutil" + "github.com/pingcap/sysutil/cache" "google.golang.org/grpc" ) @@ -187,12 +188,13 @@ func (s *searchLogSuite) TestResoveFiles(c *C) { }, } + ca := cache.NewLogFileMetaCache() for i, cas := range cases { beginTime, err := sysutil.ParseTimeStamp(cas.search.start) c.Assert(err, IsNil) endTime, err := sysutil.ParseTimeStamp(cas.search.end) c.Assert(err, IsNil) - logFiles, err := sysutil.ResolveFiles(context.Background(), filepath.Join(s.tmpDir, "tidb.log"), beginTime, endTime) + logFiles, err := sysutil.ResolveFiles(context.Background(), filepath.Join(s.tmpDir, "tidb.log"), beginTime, endTime, ca) c.Assert(err, IsNil) c.Assert(len(logFiles), Equals, len(cas.expect), Commentf("search range (index: %d): %+v", i, cas.search)) @@ -205,6 +207,7 @@ func (s *searchLogSuite) TestResoveFiles(c *C) { c.Assert(endTime, Equals, logFiles[j].EndTime(), Commentf("case index: %d, expect index: %v", i, j)) } } + c.Assert(ca.Len(), Equals, 5) } func (s *searchLogSuite) TestLogIterator(c *C) { @@ -643,6 +646,32 @@ func (s *searchLogSuite) BenchmarkReadLastLinesOfHugeLine(c *C) { } } +func (s *searchLogSuite) BenchmarkResolveFiles(c *C) { + for i := 0; i < 1000; i++ { + s.writeTmpFile(c, fmt.Sprintf("tidb-%v.log", i), []string{ + `20/08/26 06:19:13.011 -04:00 [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:19:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:19:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:19:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:19:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:19:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }) + } + + ca := cache.NewLogFileMetaCache() + beginTime, err := sysutil.ParseTimeStamp("2019/08/26 06:19:13.011 -04:00") + c.Assert(err, IsNil) + endTime, err := sysutil.ParseTimeStamp("2019/08/26 06:22:17.011 -04:00") + c.Assert(err, IsNil) + path := filepath.Join(s.tmpDir, "tidb.log") + c.ResetTimer() + for i := 0; i < c.N; i++ { + logFiles, err := sysutil.ResolveFiles(context.Background(), path, beginTime, endTime, ca) + c.Assert(err, IsNil) + c.Assert(len(logFiles), Equals, 1000) + } +} + // run benchmark by `go test -check.b` // result: // searchLogSuite.BenchmarkReadLastLines 1000000 2008 ns/op diff --git a/service.go b/service.go index d9cda8b..8df5df4 100644 --- a/service.go +++ b/service.go @@ -22,15 +22,18 @@ import ( "sort" pb "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/sysutil/cache" ) type DiagnosticsServer struct { - logFile string + logFile string + logFileMetaCache *cache.LogFileMetaCache } func NewDiagnosticsServer(logFile string) *DiagnosticsServer { return &DiagnosticsServer{ - logFile: logFile, + logFile: logFile, + logFileMetaCache: cache.NewLogFileMetaCache(), } } @@ -43,7 +46,7 @@ func (d *DiagnosticsServer) SearchLog(req *pb.SearchLogRequest, stream pb.Diagno } ctx := stream.Context() - logFiles, err := resolveFiles(ctx, d.logFile, beginTime, endTime) + logFiles, err := resolveFiles(ctx, d.logFile, beginTime, endTime, d.logFileMetaCache) if err != nil { return err } diff --git a/service_test.go b/service_test.go index 924148c..1a1ff78 100644 --- a/service_test.go +++ b/service_test.go @@ -40,7 +40,8 @@ func TestT(t *testing.T) { func (s *serviceSuite) SetUpSuite(c *C) { server := grpc.NewServer() - pb.RegisterDiagnosticsServer(server, &sysutil.DiagnosticsServer{}) + diagnosticsServer := sysutil.NewDiagnosticsServer("") + pb.RegisterDiagnosticsServer(server, diagnosticsServer) // Find a available port listener, err := net.Listen("tcp", ":0")