Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions pkg/backup/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,44 @@ package backup

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/pkg/tsoutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
brServiceSafePointID = "br"
brServiceSafePointIDFormat = "br-%s"
preUpdateServiceSafePointFactor = 3
checkGCSafePointGapTime = 5 * time.Second
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min
DefaultBRGCSafePointTTL = 5 * 60
)

// BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
type BRServiceSafePoint struct {
ID string
TTL int64
BackupTS uint64
}

// MarshalLogObject implements zapcore.ObjectMarshaler.
func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("ID", sp.ID)
ttlDuration := time.Duration(sp.TTL) * time.Second
encoder.AddString("TTL", ttlDuration.String())
backupTime, _ := tsoutil.ParseTS(sp.BackupTS)
encoder.AddString("BackupTime", backupTime.String())
encoder.AddUint64("BackupTS", sp.BackupTS)
return nil
}

// getGCSafePoint returns the current gc safe point.
// TODO: Some cluster may not enable distributed GC.
func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
Expand All @@ -30,6 +52,11 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
return safePoint, nil
}

// MakeSafePointID makes a unique safe point ID, for reduce name conflict.
func MakeSafePointID() string {
return fmt.Sprintf(brServiceSafePointIDFormat, uuid.New())
}

// CheckGCSafePoint checks whether the ts is older than GC safepoint.
// Note: It ignores errors other than exceed GC safepoint.
func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
Expand All @@ -45,53 +72,56 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
return nil
}

// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds.
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error {
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", backupTS),
zap.Int64("ttl", ttl))
// UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
log.Debug("update PD safePoint limit with TTL",
zap.Object("safePoint", sp))

_, err := pdClient.UpdateServiceGCSafePoint(ctx,
brServiceSafePointID, ttl, backupTS-1)
lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx,
sp.ID, sp.TTL, sp.BackupTS-1)
if lastSafePoint > sp.BackupTS-1 {
log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough",
zap.Uint64("lastSafePoint", lastSafePoint),
Comment thread
YuJuncen marked this conversation as resolved.
zap.Object("safePoint", sp),
)
}
return err
}

// StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity
// hence keeping service safepoint won't lose.
func StartServiceSafePointKeeper(
ctx context.Context,
ttl int64,
pdClient pd.Client,
backupTS uint64,
sp BRServiceSafePoint,
) {
// It would be OK since ttl won't be zero, so gapTime should > `0.
updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor
// It would be OK since TTL won't be zero, so gapTime should > `0.
updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor
update := func(ctx context.Context) {
if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
}
check := func(ctx context.Context) {
if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil {
if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil {
log.Panic("cannot pass gc safe point check, aborting",
zap.Error(err),
zap.Uint64("backupTS", backupTS),
zap.Object("safePoint", sp),
)
}
}
updateTick := time.NewTicker(updateGapTime)
checkTick := time.NewTicker(checkGCSafePointGapTime)
update(ctx)
go func() {
defer updateTick.Stop()
defer checkTick.Stop()
for {
select {
case <-ctx.Done():
// Before finish backup, we have to make sure
// the backup ts does not fall behind with GC safepoint.
check(context.TODO())
log.Info("service safe point keeper exited")
return
case <-updateTick.C:
Comment thread
YuJuncen marked this conversation as resolved.
update(ctx)
Expand Down
12 changes: 8 additions & 4 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}
g.Record("BackupTS", backupTS)
safePoint := backupTS
sp := backup.BRServiceSafePoint{
BackupTS: backupTS,
TTL: client.GetGCTTL(),
ID: backup.MakeSafePointID(),
}
// use lastBackupTS as safePoint if exists
if cfg.LastBackupTS > 0 {
safePoint = cfg.LastBackupTS
sp.BackupTS = cfg.LastBackupTS
}

log.Info("current backup safePoint job",
zap.Uint64("safepoint", safePoint))
backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint)
zap.Object("safePoint", sp))
backup.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)

isIncrementalBackup := cfg.LastBackupTS > 0

Expand Down