Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
44 changes: 10 additions & 34 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (bc *Client) BackupRanges(
// we collect all files in a single goroutine to avoid thread safety issues.
filesCh := make(chan []*kvproto.File, concurrency)
allFiles := make([]*kvproto.File, 0, len(ranges))
allFilesCollected := make(chan struct{}, 1)
go func() {
init := time.Now()
start, cur := init, init
Expand All @@ -412,6 +413,7 @@ func (bc *Client) BackupRanges(
summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start))
}
log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init)))
allFilesCollected <- struct{}{}
}()

go func() {
Expand Down Expand Up @@ -439,43 +441,17 @@ func (bc *Client) BackupRanges(
t := time.NewTicker(time.Second * 5)
defer t.Stop()

backupTS := req.EndVersion
// use lastBackupTS as safePoint if exists
if req.StartVersion > 0 {
backupTS = req.StartVersion
}

log.Info("current backup safePoint job",
zap.Uint64("backupTS", backupTS))

finished := false
for {
err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS)
if err != nil {
log.Error("update GC safePoint with TTL failed", zap.Error(err))
return nil, err
}
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
for err := range errCh {
if err != nil {
log.Error("check GC safePoint failed", zap.Error(err))
return nil, err
}
if finished {
// Return error (if there is any) before finishing backup.
return allFiles, err
}
select {
case err, ok := <-errCh:
if !ok {
// Before finish backup, we have to make sure
// the backup ts does not fall behind with GC safepoint.
finished = true
}
if err != nil {
return nil, err
}
case <-t.C:
}
}

select {
case <-allFilesCollected:
return allFiles, nil
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
}

Expand Down
51 changes: 50 additions & 1 deletion pkg/backup/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package backup

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -12,7 +13,9 @@ import (
)

const (
brServiceSafePointID = "br"
brServiceSafePointID = "br"
preUpdateServiceSafePointFactor = 3
checkGCSafePointGapTime = 5 * time.Second
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min
DefaultBRGCSafePointTTL = 5 * 60
)
Expand Down Expand Up @@ -52,3 +55,49 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64,
brServiceSafePointID, ttl, backupTS-1)
return err
}

// StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity
// hence keeping service safepoint won't lose.
func StartServiceSafePointKeeper(
ctx context.Context,
ttl int64,
pdClient pd.Client,
backupTS uint64,
) {
// It would be OK since ttl won't be zero, so gapTime should > `0.
updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor
update := func(ctx context.Context) {
if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil {
log.Error("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
}
check := func(ctx context.Context) {
if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil {
log.Panic("cannot pass gc safe point check, aborting",
zap.Error(err),
zap.Uint64("backupTS", backupTS),
)
}
}
updateTick := time.NewTicker(updateGapTime)
checkTick := time.NewTicker(checkGCSafePointGapTime)
go func() {
defer updateTick.Stop()
defer checkTick.Stop()
for {
select {
case <-ctx.Done():
// Before finish backup, we have to make sure
// the backup ts does not fall behind with GC safepoint.
check(context.TODO())
return
case <-updateTick.C:
update(ctx)
case <-checkTick.C:
check(ctx)
}
}
}()
}
9 changes: 9 additions & 0 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}
g.Record("BackupTS", backupTS)
safePoint := backupTS
// use lastBackupTS as safePoint if exists
if cfg.LastBackupTS > 0 {
safePoint = cfg.LastBackupTS
}

log.Info("current backup safePoint job",
zap.Uint64("safepoint", safePoint))
backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint)

isIncrementalBackup := cfg.LastBackupTS > 0

Expand Down