diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index c39a8921b..b77b609f8 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -4,22 +4,44 @@ package backup import ( "context" + "fmt" "time" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/pkg/tsoutil" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( - brServiceSafePointID = "br" + brServiceSafePointIDFormat = "br-%s" preUpdateServiceSafePointFactor = 3 checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) +// BRServiceSafePoint is metadata of service safe point from a BR 'instance'. +type BRServiceSafePoint struct { + ID string + TTL int64 + BackupTS uint64 +} + +// MarshalLogObject implements zapcore.ObjectMarshaler. +func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("ID", sp.ID) + ttlDuration := time.Duration(sp.TTL) * time.Second + encoder.AddString("TTL", ttlDuration.String()) + backupTime, _ := tsoutil.ParseTS(sp.BackupTS) + encoder.AddString("BackupTime", backupTime.String()) + encoder.AddUint64("BackupTS", sp.BackupTS) + return nil +} + // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { @@ -30,6 +52,11 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { return safePoint, nil } +// MakeSafePointID makes a unique safe point ID, for reduce name conflict. +func MakeSafePointID() string { + return fmt.Sprintf(brServiceSafePointIDFormat, uuid.New()) +} + // CheckGCSafePoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error { @@ -45,14 +72,19 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error return nil } -// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds. -func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error { - log.Debug("update PD safePoint limit with ttl", - zap.Uint64("safePoint", backupTS), - zap.Int64("ttl", ttl)) +// UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds. +func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { + log.Debug("update PD safePoint limit with TTL", + zap.Object("safePoint", sp)) - _, err := pdClient.UpdateServiceGCSafePoint(ctx, - brServiceSafePointID, ttl, backupTS-1) + lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, + sp.ID, sp.TTL, sp.BackupTS-1) + if lastSafePoint > sp.BackupTS-1 { + log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough", + zap.Uint64("lastSafePoint", lastSafePoint), + zap.Object("safePoint", sp), + ) + } return err } @@ -60,38 +92,36 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, // hence keeping service safepoint won't lose. func StartServiceSafePointKeeper( ctx context.Context, - ttl int64, pdClient pd.Client, - backupTS uint64, + sp BRServiceSafePoint, ) { - // It would be OK since ttl won't be zero, so gapTime should > `0. - updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + // It would be OK since TTL won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor update := func(ctx context.Context) { - if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { log.Warn("failed to update service safe point, backup may fail if gc triggered", zap.Error(err), ) } } check := func(ctx context.Context) { - if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil { + if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil { log.Panic("cannot pass gc safe point check, aborting", zap.Error(err), - zap.Uint64("backupTS", backupTS), + zap.Object("safePoint", sp), ) } } updateTick := time.NewTicker(updateGapTime) checkTick := time.NewTicker(checkGCSafePointGapTime) + update(ctx) go func() { defer updateTick.Stop() defer checkTick.Stop() for { select { case <-ctx.Done(): - // Before finish backup, we have to make sure - // the backup ts does not fall behind with GC safepoint. - check(context.TODO()) + log.Info("service safe point keeper exited") return case <-updateTick.C: update(ctx) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 83a16f713..2f4500283 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -158,15 +158,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) - safePoint := backupTS + sp := backup.BRServiceSafePoint{ + BackupTS: backupTS, + TTL: client.GetGCTTL(), + ID: backup.MakeSafePointID(), + } // use lastBackupTS as safePoint if exists if cfg.LastBackupTS > 0 { - safePoint = cfg.LastBackupTS + sp.BackupTS = cfg.LastBackupTS } log.Info("current backup safePoint job", - zap.Uint64("safepoint", safePoint)) - backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint) + zap.Object("safePoint", sp)) + backup.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) isIncrementalBackup := cfg.LastBackupTS > 0