diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 44e982b21..1a6b6cc3b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -403,6 +403,7 @@ func (bc *Client) BackupRanges( // we collect all files in a single goroutine to avoid thread safety issues. filesCh := make(chan []*kvproto.File, concurrency) allFiles := make([]*kvproto.File, 0, len(ranges)) + allFilesCollected := make(chan struct{}, 1) go func() { init := time.Now() start, cur := init, init @@ -412,6 +413,7 @@ func (bc *Client) BackupRanges( summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start)) } log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init))) + allFilesCollected <- struct{}{} }() go func() { @@ -439,43 +441,17 @@ func (bc *Client) BackupRanges( t := time.NewTicker(time.Second * 5) defer t.Stop() - backupTS := req.EndVersion - // use lastBackupTS as safePoint if exists - if req.StartVersion > 0 { - backupTS = req.StartVersion - } - - log.Info("current backup safePoint job", - zap.Uint64("backupTS", backupTS)) - - finished := false - for { - err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS) - if err != nil { - log.Error("update GC safePoint with TTL failed", zap.Error(err)) - return nil, err - } - err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) + for err := range errCh { if err != nil { - log.Error("check GC safePoint failed", zap.Error(err)) return nil, err } - if finished { - // Return error (if there is any) before finishing backup. - return allFiles, err - } - select { - case err, ok := <-errCh: - if !ok { - // Before finish backup, we have to make sure - // the backup ts does not fall behind with GC safepoint. - finished = true - } - if err != nil { - return nil, err - } - case <-t.C: - } + } + + select { + case <-allFilesCollected: + return allFiles, nil + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) } } diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 83be5c9fc..ae3cb3b50 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -4,6 +4,7 @@ package backup import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -12,7 +13,9 @@ import ( ) const ( - brServiceSafePointID = "br" + brServiceSafePointID = "br" + preUpdateServiceSafePointFactor = 3 + checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) @@ -52,3 +55,49 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, brServiceSafePointID, ttl, backupTS-1) return err } + +// StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity +// hence keeping service safepoint won't lose. +func StartServiceSafePointKeeper( + ctx context.Context, + ttl int64, + pdClient pd.Client, + backupTS uint64, +) { + // It would be OK since ttl won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + update := func(ctx context.Context) { + if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + log.Error("failed to update service safe point, backup may fail if gc triggered", + zap.Error(err), + ) + } + } + check := func(ctx context.Context) { + if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil { + log.Panic("cannot pass gc safe point check, aborting", + zap.Error(err), + zap.Uint64("backupTS", backupTS), + ) + } + } + updateTick := time.NewTicker(updateGapTime) + checkTick := time.NewTicker(checkGCSafePointGapTime) + go func() { + defer updateTick.Stop() + defer checkTick.Stop() + for { + select { + case <-ctx.Done(): + // Before finish backup, we have to make sure + // the backup ts does not fall behind with GC safepoint. + check(context.TODO()) + return + case <-updateTick.C: + update(ctx) + case <-checkTick.C: + check(ctx) + } + } + }() +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 6ac8c74b4..0956df988 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -136,6 +136,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) + safePoint := backupTS + // use lastBackupTS as safePoint if exists + if cfg.LastBackupTS > 0 { + safePoint = cfg.LastBackupTS + } + + log.Info("current backup safePoint job", + zap.Uint64("safepoint", safePoint)) + backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint) isIncrementalBackup := cfg.LastBackupTS > 0