From 5a4b477778f1e07e02a376fa653fd805246f41ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Tue, 21 Jul 2020 10:43:24 +0800 Subject: [PATCH 1/2] Remove scheduler (#426) * *: move remove schedulers logic out from restore. * task: add flag of remove scheduler * tests: add tests of remove scheduler * task: move remove-scheduler flag to backup config * backup: return staged undo function. Signed-off-by: Hillium * conn, backup: run undo anyway. Signed-off-by: Hillium * backup: hide remove schedulers flag Signed-off-by: Hillium Co-authored-by: 3pointer --- pkg/conn/scheduler_utils.go | 168 ++++++++++++++++++++++++++++++++++++ pkg/task/backup.go | 41 ++++++--- pkg/task/backup_raw.go | 32 +++++-- pkg/task/restore.go | 162 +++------------------------------- pkg/task/restore_raw.go | 4 +- pkg/utils/pd.go | 8 ++ tests/br_other/run.sh | 14 ++- 7 files changed, 257 insertions(+), 172 deletions(-) create mode 100644 pkg/conn/scheduler_utils.go diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go new file mode 100644 index 000000000..56b9e12b5 --- /dev/null +++ b/pkg/conn/scheduler_utils.go @@ -0,0 +1,168 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package conn + +import ( + "context" + "math" + + "github.com/pingcap/errors" + + "github.com/pingcap/br/pkg/utils" +) + +// clusterConfig represents a set of scheduler whose config have been modified +// along with their original config. +type clusterConfig struct { + // Enable PD schedulers before restore + scheduler []string + // Original scheudle configuration + scheduleCfg map[string]interface{} +} + +var ( + schedulers = map[string]struct{}{ + "balance-leader-scheduler": {}, + "balance-hot-region-scheduler": {}, + "balance-region-scheduler": {}, + + "shuffle-leader-scheduler": {}, + "shuffle-region-scheduler": {}, + "shuffle-hot-region-scheduler": {}, + } + pdRegionMergeCfg = []string{ + "max-merge-region-keys", + "max-merge-region-size", + } + pdScheduleLimitCfg = []string{ + "leader-schedule-limit", + "region-schedule-limit", + "max-snapshot-count", + } +) + +func addPDLeaderScheduler(ctx context.Context, mgr *Mgr, removedSchedulers []string) error { + for _, scheduler := range removedSchedulers { + err := mgr.AddScheduler(ctx, scheduler) + if err != nil { + return err + } + } + return nil +} + +func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) error { + if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { + return errors.Annotate(err, "fail to add PD schedulers") + } + mergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + mergeCfg[cfgKey] = value + } + if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { + return errors.Annotate(err, "fail to update PD merge config") + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + scheduleLimitCfg[cfgKey] = value + } + if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { + return errors.Annotate(err, "fail to update PD schedule config") + } + return nil +} + +func (mgr *Mgr) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc { + restore := func(ctx context.Context) error { + return restoreSchedulers(ctx, mgr, config) + } + return restore +} + +// RemoveSchedulers removes the schedulers that may slow down BR speed. +func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { + undo = utils.Nop + + // Remove default PD scheduler that may affect restore process. + existSchedulers, err := mgr.ListSchedulers(ctx) + if err != nil { + return + } + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) + } + } + scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) + if err != nil { + return + } + + undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler}) + + stores, err := mgr.GetPDClient().GetAllStores(ctx) + if err != nil { + return + } + scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) + if err != nil { + return + } + + undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg}) + + disableMergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + // Disable region merge by setting config to 0. + disableMergeCfg[cfgKey] = 0 + } + err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) + if err != nil { + return + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + + // Speed update PD scheduler by enlarging scheduling limits. + // Multiply limits by store count but no more than 40. + // Larger limit may make cluster unstable. + limit := int(value.(float64)) + scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) + } + return undo, mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) +} + +func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) { + removedSchedulers := make([]string, 0, len(existSchedulers)) + for _, scheduler := range existSchedulers { + err := mgr.RemoveScheduler(ctx, scheduler) + if err != nil { + return nil, err + } + removedSchedulers = append(removedSchedulers, scheduler) + } + return removedSchedulers, nil +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b79f33d35..83a16f713 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -28,10 +28,11 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupTS = "backupts" - flagLastBackupTS = "lastbackupts" - flagCompressionType = "compression" + flagBackupTimeago = "timeago" + flagBackupTS = "backupts" + flagLastBackupTS = "lastbackupts" + flagCompressionType = "compression" + flagRemoveSchedulers = "remove-schedulers" flagGCTTL = "gcttl" @@ -43,11 +44,12 @@ const ( type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` } // DefineBackupFlags defines common flags for the backup command. @@ -64,6 +66,11 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") flags.String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") + + flags.Bool(flagRemoveSchedulers, false, + "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") + // This flag can impact the online cluster, so hide it in case of abuse. + _ = flags.MarkHidden(flagRemoveSchedulers) } // ParseFromFlags parses the backup-related flags from the flag set. @@ -113,7 +120,8 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if cfg.Config.Concurrency > maxBackupConcurrency { cfg.Config.Concurrency = maxBackupConcurrency } - return nil + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + return err } // RunBackup starts a backup task inside the current goroutine. @@ -162,6 +170,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 + if cfg.RemoveSchedulers { + log.Debug("removing some PD schedulers") + restore, e := mgr.RemoveSchedulers(ctx) + defer func() { + if restoreE := restore(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + if e != nil { + return err + } + } + req := kvproto.BackupRequest{ StartVersion: cfg.LastBackupTS, EndVersion: backupTS, diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 4d237848b..833477f90 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -8,8 +8,10 @@ import ( "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" "github.com/spf13/cobra" "github.com/spf13/pflag" + "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/conn" @@ -31,10 +33,11 @@ const ( type RawKvConfig struct { Config - StartKey []byte `json:"start-key" toml:"start-key"` - EndKey []byte `json:"end-key" toml:"end-key"` - CF string `json:"cf" toml:"cf"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` } // DefineRawBackupFlags defines common flags for the backup command. @@ -45,6 +48,10 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive") command.Flags().String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") + command.Flags().Bool(flagRemoveSchedulers, false, + "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") + // This flag can impact the online cluster, so hide it in case of abuse. + _ = command.Flags().MarkHidden(flagRemoveSchedulers) } // ParseFromFlags parses the raw kv backup&restore common flags from the flag set. @@ -73,7 +80,6 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { if bytes.Compare(cfg.StartKey, cfg.EndKey) >= 0 { return errors.New("endKey must be greater than startKey") } - cfg.CF, err = flags.GetString(flagTiKVColumnFamily) if err != nil { return err @@ -100,6 +106,10 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } cfg.CompressionType = compressionType + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + if err != nil { + return errors.Trace(err) + } return nil } @@ -129,6 +139,18 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} + if cfg.RemoveSchedulers { + restore, e := mgr.RemoveSchedulers(ctx) + defer func() { + if restoreE := restore(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + if e != nil { + return err + } + } + // The number of regions need to backup approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey) if err != nil { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index f53765c0f..3221d5fbd 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -4,7 +4,6 @@ package task import ( "context" - "math" "time" "github.com/pingcap/errors" @@ -33,27 +32,6 @@ const ( defaultDDLConcurrency = 16 ) -var ( - schedulers = map[string]struct{}{ - "balance-leader-scheduler": {}, - "balance-hot-region-scheduler": {}, - "balance-region-scheduler": {}, - - "shuffle-leader-scheduler": {}, - "shuffle-region-scheduler": {}, - "shuffle-hot-region-scheduler": {}, - } - pdRegionMergeCfg = []string{ - "max-merge-region-keys", - "max-merge-region-size", - } - pdScheduleLimitCfg = []string{ - "leader-schedule-limit", - "region-schedule-limit", - "max-snapshot-count", - } -) - // RestoreConfig is the configuration specific for restore tasks. type RestoreConfig struct { Config @@ -220,13 +198,13 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf summary.CollectInt("restore ranges", rangeSize) log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) - clusterCfg, err := restorePreWork(ctx, client, mgr) + restoreSchedulers, err := restorePreWork(ctx, client, mgr) if err != nil { return err } // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers - defer restorePostWork(ctx, client, mgr, clusterCfg) + defer restorePostWork(ctx, client, restoreSchedulers) // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. @@ -339,153 +317,33 @@ func filterRestoreFiles( return } -type clusterConfig struct { - // Enable PD schedulers before restore - scheduler []string - // Original scheudle configuration - scheduleCfg map[string]interface{} -} - // restorePreWork executes some prepare work before restore. -func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) { +// TODO make this function returns a restore post work. +func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (utils.UndoFunc, error) { if client.IsOnline() { - return clusterConfig{}, nil + return utils.Nop, nil } // Switch TiKV cluster to import mode (adjust rocksdb configuration). client.SwitchToImportMode(ctx) - // Remove default PD scheduler that may affect restore process. - existSchedulers, err := mgr.ListSchedulers(ctx) - if err != nil { - return clusterConfig{}, nil - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) - if err != nil { - return clusterConfig{}, nil - } - - stores, err := mgr.GetPDClient().GetAllStores(ctx) - if err != nil { - return clusterConfig{}, err - } - - scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) - if err != nil { - return clusterConfig{}, err - } - - disableMergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - // Disable region merge by setting config to 0. - disableMergeCfg[cfgKey] = 0 - } - err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) - if err != nil { - return clusterConfig{}, err - } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - - // Speed update PD scheduler by enlarging scheduling limits. - // Multiply limits by store count but no more than 40. - // Larger limit may make cluster unstable. - limit := int(value.(float64)) - scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) - } - err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) - if err != nil { - return clusterConfig{}, err - } - - cluster := clusterConfig{ - scheduler: scheduler, - scheduleCfg: scheduleCfg, - } - return cluster, nil -} - -func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) { - removedSchedulers := make([]string, 0, len(existSchedulers)) - for _, scheduler := range existSchedulers { - err := mgr.RemoveScheduler(ctx, scheduler) - if err != nil { - return nil, err - } - removedSchedulers = append(removedSchedulers, scheduler) - } - return removedSchedulers, nil + return mgr.RemoveSchedulers(ctx) } // restorePostWork executes some post work after restore. // TODO: aggregate all lifetime manage methods into batcher's context manager field. func restorePostWork( - ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig, + ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { if client.IsOnline() { return } if err := client.SwitchToNormalMode(ctx); err != nil { - log.Warn("fail to switch to normal mode") - } - if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { - log.Warn("fail to add PD schedulers") - } - mergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - mergeCfg[cfgKey] = value - } - if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { - log.Warn("fail to update PD region merge config") + log.Warn("fail to switch to normal mode", zap.Error(err)) } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - scheduleLimitCfg[cfgKey] = value + if err := restoreSchedulers(ctx); err != nil { + log.Warn("failed to restore PD schedulers", zap.Error(err)) } - if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - log.Warn("fail to update PD schedule config") - } - if err := client.ResetRestoreLabels(ctx); err != nil { - log.Warn("reset store labels failed", zap.Error(err)) - } -} - -func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { - for _, scheduler := range removedSchedulers { - err := mgr.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - } - return nil } // RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore. diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index b806ca620..4bd867086 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -111,11 +111,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } - removedSchedulers, err := restorePreWork(ctx, client, mgr) + restoreSchedulers, err := restorePreWork(ctx, client, mgr) if err != nil { return errors.Trace(err) } - defer restorePostWork(ctx, client, mgr, removedSchedulers) + defer restorePostWork(ctx, client, restoreSchedulers) err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { diff --git a/pkg/utils/pd.go b/pkg/utils/pd.go index 662996bdc..1aadbdbcc 100644 --- a/pkg/utils/pd.go +++ b/pkg/utils/pd.go @@ -4,6 +4,7 @@ package utils import ( "bytes" + "context" "crypto/tls" "encoding/hex" "encoding/json" @@ -18,6 +19,13 @@ import ( "github.com/pingcap/tidb/tablecodec" ) +// UndoFunc is a 'undo' operation of some undoable command. +// (e.g. RemoveSchedulers). +type UndoFunc func(context.Context) error + +// Nop is the 'zero value' of undo func. +var Nop UndoFunc = func(context.Context) error { return nil } + const ( resetTSURL = "/pd/api/v1/admin/reset-ts" placementRuleURL = "/pd/api/v1/config/rules" diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 9640a1600..7d968610e 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -58,10 +58,11 @@ fi echo "backup start to test lock file" PPROF_PORT=6080 GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \ -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & # record last backup pid _pid=$! + # give the former backup some time to write down lock file (and initialize signal listener). sleep 1 pkill -10 -P $_pid @@ -72,6 +73,8 @@ sleep 1 curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null echo "pprof started..." +curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' + backup_fail=0 echo "another backup start expect to fail due to last backup add a lockfile" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 @@ -83,13 +86,18 @@ fi if ps -p $_pid > /dev/null then echo "$_pid is running" - # kill last backup progress - kill -9 $_pid + # kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.) + kill $_pid else echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished" exit 1 fi +# make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. +# give enough time to BR so it can gracefully stop. +sleep 10 +! curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' + run_sql "DROP DATABASE $DB;" # Test version From b6dea31fc9469a1eabed680a6a3396aff4bedba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 22 Jul 2020 15:27:04 +0800 Subject: [PATCH 2/2] fix some design of safe point keeper (#416) * backup: enhance safe point * backup: exit safe point keeper when ctx done * backup: return after ctx is done... :| * backup: don't use global variable to name service safe point * backup: log safe point as an object Signed-off-by: Hillium * backup: make out hound happy, woof! Signed-off-by: Hillium * *: run goimport Signed-off-by: Hillium * backup: fix a exception during merging Signed-off-by: Hillium * backup: remove blank lines of import Co-authored-by: 3pointer --- pkg/backup/safe_point.go | 66 +++++++++++++++++++++++++++++----------- pkg/task/backup.go | 12 +++++--- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index c39a8921b..b77b609f8 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -4,22 +4,44 @@ package backup import ( "context" + "fmt" "time" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/pkg/tsoutil" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( - brServiceSafePointID = "br" + brServiceSafePointIDFormat = "br-%s" preUpdateServiceSafePointFactor = 3 checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min DefaultBRGCSafePointTTL = 5 * 60 ) +// BRServiceSafePoint is metadata of service safe point from a BR 'instance'. +type BRServiceSafePoint struct { + ID string + TTL int64 + BackupTS uint64 +} + +// MarshalLogObject implements zapcore.ObjectMarshaler. +func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("ID", sp.ID) + ttlDuration := time.Duration(sp.TTL) * time.Second + encoder.AddString("TTL", ttlDuration.String()) + backupTime, _ := tsoutil.ParseTS(sp.BackupTS) + encoder.AddString("BackupTime", backupTime.String()) + encoder.AddUint64("BackupTS", sp.BackupTS) + return nil +} + // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { @@ -30,6 +52,11 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { return safePoint, nil } +// MakeSafePointID makes a unique safe point ID, for reduce name conflict. +func MakeSafePointID() string { + return fmt.Sprintf(brServiceSafePointIDFormat, uuid.New()) +} + // CheckGCSafePoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error { @@ -45,14 +72,19 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error return nil } -// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds. -func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error { - log.Debug("update PD safePoint limit with ttl", - zap.Uint64("safePoint", backupTS), - zap.Int64("ttl", ttl)) +// UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds. +func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { + log.Debug("update PD safePoint limit with TTL", + zap.Object("safePoint", sp)) - _, err := pdClient.UpdateServiceGCSafePoint(ctx, - brServiceSafePointID, ttl, backupTS-1) + lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, + sp.ID, sp.TTL, sp.BackupTS-1) + if lastSafePoint > sp.BackupTS-1 { + log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough", + zap.Uint64("lastSafePoint", lastSafePoint), + zap.Object("safePoint", sp), + ) + } return err } @@ -60,38 +92,36 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, // hence keeping service safepoint won't lose. func StartServiceSafePointKeeper( ctx context.Context, - ttl int64, pdClient pd.Client, - backupTS uint64, + sp BRServiceSafePoint, ) { - // It would be OK since ttl won't be zero, so gapTime should > `0. - updateGapTime := time.Duration(ttl) * time.Second / preUpdateServiceSafePointFactor + // It would be OK since TTL won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(sp.TTL) * time.Second / preUpdateServiceSafePointFactor update := func(ctx context.Context) { - if err := UpdateServiceSafePoint(ctx, pdClient, ttl, backupTS); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { log.Warn("failed to update service safe point, backup may fail if gc triggered", zap.Error(err), ) } } check := func(ctx context.Context) { - if err := CheckGCSafePoint(ctx, pdClient, backupTS); err != nil { + if err := CheckGCSafePoint(ctx, pdClient, sp.BackupTS); err != nil { log.Panic("cannot pass gc safe point check, aborting", zap.Error(err), - zap.Uint64("backupTS", backupTS), + zap.Object("safePoint", sp), ) } } updateTick := time.NewTicker(updateGapTime) checkTick := time.NewTicker(checkGCSafePointGapTime) + update(ctx) go func() { defer updateTick.Stop() defer checkTick.Stop() for { select { case <-ctx.Done(): - // Before finish backup, we have to make sure - // the backup ts does not fall behind with GC safepoint. - check(context.TODO()) + log.Info("service safe point keeper exited") return case <-updateTick.C: update(ctx) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 83a16f713..2f4500283 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -158,15 +158,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } g.Record("BackupTS", backupTS) - safePoint := backupTS + sp := backup.BRServiceSafePoint{ + BackupTS: backupTS, + TTL: client.GetGCTTL(), + ID: backup.MakeSafePointID(), + } // use lastBackupTS as safePoint if exists if cfg.LastBackupTS > 0 { - safePoint = cfg.LastBackupTS + sp.BackupTS = cfg.LastBackupTS } log.Info("current backup safePoint job", - zap.Uint64("safepoint", safePoint)) - backup.StartServiceSafePointKeeper(ctx, client.GetGCTTL(), mgr.GetPDClient(), safePoint) + zap.Object("safePoint", sp)) + backup.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) isIncrementalBackup := cfg.LastBackupTS > 0