From ada41fbcaba20f88c438474d1cc835d5e31756cb Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 13 Jul 2020 19:20:17 +0800 Subject: [PATCH 1/9] backup: enhance safe point --- pkg/backup/safe_point.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index ae3cb3b50..72ed5f2bc 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -4,8 +4,10 @@ package backup import ( "context" + "fmt" "time" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" @@ -13,13 +15,16 @@ import ( ) const ( - brServiceSafePointID = "br" + brServiceSafePointIDFormat = "br-%s" preUpdateServiceSafePointFactor = 3 checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) +// for each BR, use different safe point ID so they won't conflict. +var brServiceSafePointID = fmt.Sprintf(brServiceSafePointIDFormat, uuid.New()) + // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { @@ -51,8 +56,13 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, zap.Uint64("safePoint", backupTS), zap.Int64("ttl", ttl)) - _, err := pdClient.UpdateServiceGCSafePoint(ctx, + lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, brServiceSafePointID, ttl, backupTS-1) + if lastSafePoint > backupTS-1 { + log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough", + zap.Uint64("lastSafePoint", lastSafePoint), + ) + } return err } @@ -83,16 +93,12 @@ func StartServiceSafePointKeeper( } updateTick := time.NewTicker(updateGapTime) checkTick := time.NewTicker(checkGCSafePointGapTime) + update(ctx) go func() { defer updateTick.Stop() defer checkTick.Stop() for { select { - case <-ctx.Done(): - // Before finish backup, we have to make sure - // the backup ts does not fall behind with GC safepoint. - check(context.TODO()) - return case <-updateTick.C: update(ctx) case <-checkTick.C: From 80bab2b19baa69b1877a6bc6650c4ee447ca310f Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 14 Jul 2020 09:59:44 +0800 Subject: [PATCH 2/9] backup: exit safe point keeper when ctx done --- pkg/backup/safe_point.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 72ed5f2bc..42898c083 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -99,6 +99,8 @@ func StartServiceSafePointKeeper( defer checkTick.Stop() for { select { + case <-ctx.Done(): + log.Info("service safe point keeper exited") case <-updateTick.C: update(ctx) case <-checkTick.C: From 5547941ce908ee5a2b26797b4fd6e746dc7a3865 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 14 Jul 2020 10:04:14 +0800 Subject: [PATCH 3/9] backup: return after ctx is done... :| --- pkg/backup/safe_point.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 42898c083..2bf1dc653 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -101,6 +101,7 @@ func StartServiceSafePointKeeper( select { case <-ctx.Done(): log.Info("service safe point keeper exited") + return case <-updateTick.C: update(ctx) case <-checkTick.C: From 681552b883473e309912b650bd15aa6c3513e1a9 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 14 Jul 2020 18:32:25 +0800 Subject: [PATCH 4/9] backup: don't use global variable to name service safe point --- pkg/backup/safe_point.go | 41 ++++++++++++++++++++++++---------------- pkg/task/backup.go | 12 ++++++++---- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 2bf1dc653..c43e49f61 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" @@ -22,8 +23,12 @@ const ( DefaultBRGCSafePointTTL = 5 * 60 ) -// for each BR, use different safe point ID so they won't conflict. -var brServiceSafePointID = fmt.Sprintf(brServiceSafePointIDFormat, uuid.New()) +// BRServiceSafePoint is metadata of service safe point from a BR 'instance'. +type BRServiceSafePoint struct { + ID string + TTL int64 + BackupTS uint64 +} // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. @@ -35,6 +40,11 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { return safePoint, nil } +// MakeSafePointID makes a unique safe point ID, for reduce name conflict. +func MakeSafePointID() string { + return fmt.Sprintf(brServiceSafePointIDFormat, uuid.New()) +} + // CheckGCSafePoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error { @@ -50,15 +60,15 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error return nil } -// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds. -func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error { - log.Debug("update PD safePoint limit with ttl", - zap.Uint64("safePoint", backupTS), - zap.Int64("ttl", ttl)) +// UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds. +func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { + log.Debug("update PD safePoint limit with TTL", + zap.Uint64("safePoint", sp.BackupTS), + zap.Int64("TTL", sp.TTL)) lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, - brServiceSafePointID, ttl, backupTS-1) - if lastSafePoint > backupTS-1 { + sp.ID, sp.TTL, sp.BackupTS-1) + if lastSafePoint > sp.BackupTS-1 { log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough", zap.Uint64("lastSafePoint", lastSafePoint), ) @@ -70,24 +80,23 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, // hence keeping service safepoint won't lose. func StartServiceSafePointKeeper( ctx context.Context, - ttl int64, pdClient pd.Client, - backupTS uint64, + sp BRServiceSafePoint, ) { - // It would be OK since ttl won't be zero, so gapTime should > `0. - updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + // It would be OK since TTL won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor update := func(ctx context.Context) { - if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { log.Error("failed to update service safe point, backup may fail if gc triggered", zap.Error(err), ) } } check := func(ctx context.Context) { - if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil { + if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil { log.Panic("cannot pass gc safe point check, aborting", zap.Error(err), - zap.Uint64("backupTS", backupTS), + zap.Uint64("backupTS", sp.BackupTS), ) } } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b79f33d35..7cf82b2e0 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -150,15 +150,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) - safePoint := backupTS + sp := backup.BRServiceSafePoint{ + BackupTS: backupTS, + TTL: client.GetGCTTL(), + ID: backup.MakeSafePointID(), + } // use lastBackupTS as safePoint if exists if cfg.LastBackupTS > 0 { - safePoint = cfg.LastBackupTS + sp.BackupTS = cfg.LastBackupTS } log.Info("current backup safePoint job", - zap.Uint64("safepoint", safePoint)) - backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint) + zap.Uint64("safepoint", sp.BackupTS)) + backup.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) isIncrementalBackup := cfg.LastBackupTS > 0 From 9a1ccb8cc46fecebfb4dc6c525d27af4b2e85290 Mon Sep 17 00:00:00 2001 From: yujuncen Date: Thu, 16 Jul 2020 11:05:31 +0800 Subject: [PATCH 5/9] backup: log safe point as an object Signed-off-by: Hillium --- pkg/backup/safe_point.go | 18 +++++++++++++++--- pkg/task/backup.go | 2 +- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index c43e49f61..719b70474 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -5,6 +5,8 @@ package backup import ( "context" "fmt" + "github.com/pingcap/pd/v4/pkg/tsoutil" + "go.uber.org/zap/zapcore" "time" "github.com/google/uuid" @@ -30,6 +32,16 @@ type BRServiceSafePoint struct { BackupTS uint64 } +func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("ID", sp.ID) + ttlDuration := time.Duration(sp.TTL) * time.Second + encoder.AddString("TTL", ttlDuration.String()) + backupTime, _ := tsoutil.ParseTS(sp.BackupTS) + encoder.AddString("BackupTime", backupTime.String()) + encoder.AddUint64("BackupTS", sp.BackupTS) + return nil +} + // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { @@ -63,14 +75,14 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error // UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds. func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { log.Debug("update PD safePoint limit with TTL", - zap.Uint64("safePoint", sp.BackupTS), - zap.Int64("TTL", sp.TTL)) + zap.Object("safePoint", sp)) lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1) if lastSafePoint > sp.BackupTS-1 { log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough", zap.Uint64("lastSafePoint", lastSafePoint), + zap.Object("safePoint", sp), ) } return err @@ -96,7 +108,7 @@ func StartServiceSafePointKeeper( if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil { log.Panic("cannot pass gc safe point check, aborting", zap.Error(err), - zap.Uint64("backupTS", sp.BackupTS), + zap.Object("safePoint", sp), ) } } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 7cf82b2e0..91dc86afe 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -161,7 +161,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } log.Info("current backup safePoint job", - zap.Uint64("safepoint", sp.BackupTS)) + zap.Object("safePoint", sp)) backup.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) isIncrementalBackup := cfg.LastBackupTS > 0 From 76e447f9614cf98cba32afddf0bf800152977906 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 11:08:33 +0800 Subject: [PATCH 6/9] backup: make out hound happy, woof! Signed-off-by: Hillium --- pkg/backup/safe_point.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 719b70474..de51c365b 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -32,6 +32,7 @@ type BRServiceSafePoint struct { BackupTS uint64 } +// MarshalLogObject implements zapcore.ObjectMarshaler. func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("ID", sp.ID) ttlDuration := time.Duration(sp.TTL) * time.Second From 790118565bf026f87e048dc2da8abbd2a9bd1615 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 11:15:32 +0800 Subject: [PATCH 7/9] *: run goimport Signed-off-by: Hillium --- pkg/backup/safe_point.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index de51c365b..992318f71 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -5,9 +5,10 @@ package backup import ( "context" "fmt" + "time" + "github.com/pingcap/pd/v4/pkg/tsoutil" "go.uber.org/zap/zapcore" - "time" "github.com/google/uuid" From 5dbac091654f323ac7bdd1f3a641c2bd1b1be015 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 10:40:27 +0800 Subject: [PATCH 8/9] backup: fix a exception during merging Signed-off-by: Hillium --- pkg/backup/safe_point.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index 9d886c122..ac6d2b509 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -100,7 +100,7 @@ func StartServiceSafePointKeeper( // It would be OK since TTL won't be zero, so gapTime should > `0. updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor update := func(ctx context.Context) { - if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { log.Warn("failed to update service safe point, backup may fail if gc triggered", zap.Error(err), ) From 8b3055a49500be657262944f6afadfbf24deeb88 Mon Sep 17 00:00:00 2001 From: Hillium Date: Wed, 22 Jul 2020 15:07:50 +0800 Subject: [PATCH 9/9] backup: remove blank lines of import --- pkg/backup/safe_point.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index ac6d2b509..b77b609f8 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -7,15 +7,13 @@ import ( "fmt" "time" - "github.com/pingcap/pd/v4/pkg/tsoutil" - "go.uber.org/zap/zapcore" - "github.com/google/uuid" - "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/pkg/tsoutil" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const (