diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 13236388b..9693b6b5f 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -384,7 +384,7 @@ func (bc *Client) BackupRange( if err != nil { summary.CollectFailureUnit(key, err) } else { - summary.CollectSuccessUnit(key, elapsed) + summary.CollectSuccessUnit(key, 1, elapsed) } }() log.Info("backup started", @@ -771,8 +771,8 @@ func (bc *Client) FastChecksum() (bool, error) { totalBytes += file.TotalBytes } - summary.CollectSuccessUnit(summary.TotalKV, totalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, totalBytes) + summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes) if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes { log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name)) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 914b94607..ba409ec32 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -289,9 +289,7 @@ func (rc *Client) RestoreFiles( if err == nil { log.Info("Restore Files", zap.Int("files", len(files)), zap.Duration("take", elapsed)) - summary.CollectSuccessUnit("files", elapsed) - } else { - summary.CollectFailureUnit("files", err) + summary.CollectSuccessUnit("files", len(files), elapsed) } }() @@ -320,9 +318,10 @@ func (rc *Client) RestoreFiles( } }) } - for range files { + for i := range files { err := <-errCh if err != nil { + summary.CollectFailureUnit(fmt.Sprintf("file:%d", i), err) rc.cancel() wg.Wait() log.Error( diff --git a/pkg/restore/import.go b/pkg/restore/import.go index f98e0fc13..9b96509ea 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -264,8 +264,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Error(errIngest)) return errIngest } - summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes) + summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) } return nil }, newImportSSTBackoffer()) diff --git a/pkg/summary/collector.go b/pkg/summary/collector.go index 42488cb82..ee465d60b 100644 --- a/pkg/summary/collector.go +++ b/pkg/summary/collector.go @@ -27,7 +27,7 @@ const ( type LogCollector interface { SetUnit(unit string) - CollectSuccessUnit(name string, arg interface{}) + CollectSuccessUnit(name string, unitCount int, arg interface{}) CollectFailureUnit(name string, reason error) @@ -43,27 +43,29 @@ type logFunc func(msg string, fields ...zap.Field) var collector = newLogCollector(log.Info) type logCollector struct { - mu sync.Mutex - unit string - unitCount int - successCosts map[string]time.Duration - successData map[string]uint64 - failureReasons map[string]error - durations map[string]time.Duration - ints map[string]int + mu sync.Mutex + unit string + successUnitCount int + failureUnitCount int + successCosts map[string]time.Duration + successData map[string]uint64 + failureReasons map[string]error + durations map[string]time.Duration + ints map[string]int log logFunc } func newLogCollector(log logFunc) LogCollector { return &logCollector{ - unitCount: 0, - successCosts: make(map[string]time.Duration), - successData: make(map[string]uint64), - failureReasons: make(map[string]error), - durations: make(map[string]time.Duration), - ints: make(map[string]int), - log: log, + successUnitCount: 0, + failureUnitCount: 0, + successCosts: make(map[string]time.Duration), + successData: make(map[string]uint64), + failureReasons: make(map[string]error), + durations: make(map[string]time.Duration), + ints: make(map[string]int), + log: log, } } @@ -73,7 +75,7 @@ func (tc *logCollector) SetUnit(unit string) { tc.unit = unit } -func (tc *logCollector) CollectSuccessUnit(name string, arg interface{}) { +func (tc *logCollector) CollectSuccessUnit(name string, unitCount int, arg interface{}) { tc.mu.Lock() defer tc.mu.Unlock() @@ -81,7 +83,7 @@ func (tc *logCollector) CollectSuccessUnit(name string, arg interface{}) { case time.Duration: if _, ok := tc.successCosts[name]; !ok { tc.successCosts[name] = v - tc.unitCount++ + tc.successUnitCount += unitCount } else { tc.successCosts[name] += v } @@ -99,7 +101,7 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) { defer tc.mu.Unlock() if _, ok := tc.failureReasons[name]; !ok { tc.failureReasons[name] = reason - tc.unitCount++ + tc.failureUnitCount++ } } @@ -129,16 +131,10 @@ func (tc *logCollector) Summary(name string) { switch tc.unit { case BackupUnit: msg = fmt.Sprintf("total backup ranges: %d, total success: %d, total failed: %d", - tc.unitCount, len(tc.successCosts), len(tc.failureReasons)) - if len(tc.failureReasons) != 0 { - msg += ", failed ranges" - } + tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount) case RestoreUnit: - msg = fmt.Sprintf("total restore tables: %d, total success: %d, total failed: %d", - tc.unitCount, len(tc.successCosts), len(tc.failureReasons)) - if len(tc.failureReasons) != 0 { - msg += ", failed tables" - } + msg = fmt.Sprintf("total restore files: %d, total success: %d, total failed: %d", + tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount) } logFields := make([]zap.Field, 0, len(tc.durations)+len(tc.ints)) @@ -150,12 +146,10 @@ func (tc *logCollector) Summary(name string) { } if len(tc.failureReasons) != 0 { - names := make([]string, 0, len(tc.failureReasons)) - for name := range tc.failureReasons { - names = append(names, name) + for unitName, reason := range tc.failureReasons { + logFields = append(logFields, zap.String("unitName", unitName), zap.Error(reason)) } - logFields = append(logFields, zap.Strings(msg, names)) - log.Info(name+" summary", logFields...) + log.Info(name+" Failed summary : "+msg, logFields...) return } totalCost := time.Duration(0) @@ -178,7 +172,7 @@ func (tc *logCollector) Summary(name string) { msg += fmt.Sprintf(", %s: %d", name, data) } - tc.log(name+" summary: "+msg, logFields...) + tc.log(name+" Success summary: "+msg, logFields...) } // SetLogCollector allow pass LogCollector outside diff --git a/pkg/summary/summary.go b/pkg/summary/summary.go index 08a16c00a..3ffdedf8a 100644 --- a/pkg/summary/summary.go +++ b/pkg/summary/summary.go @@ -10,8 +10,8 @@ func SetUnit(unit string) { } // CollectSuccessUnit collects success time costs -func CollectSuccessUnit(name string, arg interface{}) { - collector.CollectSuccessUnit(name, arg) +func CollectSuccessUnit(name string, unitCount int, arg interface{}) { + collector.CollectSuccessUnit(name, unitCount, arg) } // CollectFailureUnit collects fail reason diff --git a/pkg/task/restore.go b/pkg/task/restore.go index c443b4c0f..4dac5f869 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -114,7 +114,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if len(files) == 0 { return errors.New("all files are filtered out from the backup archive, nothing to restore") } - summary.CollectInt("restore files", len(files)) var newTS uint64 if client.IsIncremental() {