From d6d402365f9867af1af78d35d05cb5cd6626d000 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 13 Jul 2020 16:52:42 +0800 Subject: [PATCH 1/7] *: move remove schedulers logic out from restore. --- pkg/conn/scheduler_utils.go | 167 ++++++++++++++++++++++++++++++++++++ pkg/task/restore.go | 136 +++-------------------------- pkg/task/restore_raw.go | 4 +- pkg/utils/pd.go | 8 ++ 4 files changed, 188 insertions(+), 127 deletions(-) create mode 100644 pkg/conn/scheduler_utils.go diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go new file mode 100644 index 000000000..64b11c330 --- /dev/null +++ b/pkg/conn/scheduler_utils.go @@ -0,0 +1,167 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package conn + +import ( + "context" + "math" + + "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/errors" +) + +// clusterConfig represents a set of scheduler whose config have been modified +// along with their original config. +type clusterConfig struct { + // Enable PD schedulers before restore + scheduler []string + // Original scheudle configuration + scheduleCfg map[string]interface{} +} + +var ( + schedulers = map[string]struct{}{ + "balance-leader-scheduler": {}, + "balance-hot-region-scheduler": {}, + "balance-region-scheduler": {}, + + "shuffle-leader-scheduler": {}, + "shuffle-region-scheduler": {}, + "shuffle-hot-region-scheduler": {}, + } + pdRegionMergeCfg = []string{ + "max-merge-region-keys", + "max-merge-region-size", + } + pdScheduleLimitCfg = []string{ + "leader-schedule-limit", + "region-schedule-limit", + "max-snapshot-count", + } +) + +func addPDLeaderScheduler(ctx context.Context, mgr *Mgr, removedSchedulers []string) error { + for _, scheduler := range removedSchedulers { + err := mgr.AddScheduler(ctx, scheduler) + if err != nil { + return err + } + } + return nil +} + +func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) error { + if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { + return errors.Annotate(err, "fail to add PD schedulers") + } + mergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + mergeCfg[cfgKey] = value + } + if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { + return errors.Annotate(err, "fail to update PD merge config") + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + scheduleLimitCfg[cfgKey] = value + } + if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { + return errors.Annotate(err, "fail to update PD schedule config") + } + return nil +} + +// RemoveSchedulers removes the schedulers that may slow down BR speed. +func RemoveSchedulers(ctx context.Context, mgr *Mgr) (utils.UndoFunc, error) { + // Remove default PD scheduler that may affect restore process. + existSchedulers, err := mgr.ListSchedulers(ctx) + if err != nil { + return utils.Nop, nil + } + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) + } + } + scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) + if err != nil { + return utils.Nop, nil + } + + stores, err := mgr.GetPDClient().GetAllStores(ctx) + if err != nil { + return utils.Nop, err + } + + scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) + if err != nil { + return utils.Nop, err + } + + disableMergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + // Disable region merge by setting config to 0. + disableMergeCfg[cfgKey] = 0 + } + err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) + if err != nil { + return utils.Nop, err + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + + // Speed update PD scheduler by enlarging scheduling limits. + // Multiply limits by store count but no more than 40. + // Larger limit may make cluster unstable. + limit := int(value.(float64)) + scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) + } + err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) + if err != nil { + return utils.Nop, err + } + + cluster := clusterConfig{ + scheduler: scheduler, + scheduleCfg: scheduleCfg, + } + restore := func(ctx context.Context) error { + return restoreSchedulers(ctx, mgr, cluster) + } + return restore, nil +} + +func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) { + removedSchedulers := make([]string, 0, len(existSchedulers)) + for _, scheduler := range existSchedulers { + err := mgr.RemoveScheduler(ctx, scheduler) + if err != nil { + return nil, err + } + removedSchedulers = append(removedSchedulers, scheduler) + } + return removedSchedulers, nil +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 22a65dd7c..72244370b 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -4,7 +4,6 @@ package task import ( "context" - "math" "time" "github.com/pingcap/errors" @@ -219,13 +218,13 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf summary.CollectInt("restore ranges", rangeSize) log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) - clusterCfg, err := restorePreWork(ctx, client, mgr) + restoreSchedulers, err := restorePreWork(ctx, client, mgr) if err != nil { return err } // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers - defer restorePostWork(ctx, client, mgr, clusterCfg) + defer restorePostWork(ctx, client, mgr, restoreSchedulers) // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. @@ -346,147 +345,34 @@ type clusterConfig struct { } // restorePreWork executes some prepare work before restore. -func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (clusterConfig, error) { +// TODO make this function returns a restore post work. +func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (utils.UndoFunc, error) { if client.IsOnline() { - return clusterConfig{}, nil + return utils.Nop, nil } // Switch TiKV cluster to import mode (adjust rocksdb configuration). if err := client.SwitchToImportMode(ctx); err != nil { - return clusterConfig{}, nil + return utils.Nop, nil } - // Remove default PD scheduler that may affect restore process. - existSchedulers, err := mgr.ListSchedulers(ctx) - if err != nil { - return clusterConfig{}, nil - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) - if err != nil { - return clusterConfig{}, nil - } - - stores, err := mgr.GetPDClient().GetAllStores(ctx) - if err != nil { - return clusterConfig{}, err - } - - scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) - if err != nil { - return clusterConfig{}, err - } - - disableMergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - // Disable region merge by setting config to 0. - disableMergeCfg[cfgKey] = 0 - } - err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) - if err != nil { - return clusterConfig{}, err - } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - - // Speed update PD scheduler by enlarging scheduling limits. - // Multiply limits by store count but no more than 40. - // Larger limit may make cluster unstable. - limit := int(value.(float64)) - scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) - } - err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) - if err != nil { - return clusterConfig{}, err - } - - cluster := clusterConfig{ - scheduler: scheduler, - scheduleCfg: scheduleCfg, - } - return cluster, nil -} - -func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) { - removedSchedulers := make([]string, 0, len(existSchedulers)) - for _, scheduler := range existSchedulers { - err := mgr.RemoveScheduler(ctx, scheduler) - if err != nil { - return nil, err - } - removedSchedulers = append(removedSchedulers, scheduler) - } - return removedSchedulers, nil + return conn.RemoveSchedulers(ctx, mgr) } // restorePostWork executes some post work after restore. // TODO: aggregate all lifetime manage methods into batcher's context manager field. func restorePostWork( - ctx context.Context, client *restore.Client, mgr *conn.Mgr, clusterCfg clusterConfig, + ctx context.Context, client *restore.Client, mgr *conn.Mgr, restoreSchedulers utils.UndoFunc, ) { if client.IsOnline() { return } if err := client.SwitchToNormalMode(ctx); err != nil { - log.Warn("fail to switch to normal mode") - } - if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { - log.Warn("fail to add PD schedulers") - } - mergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - mergeCfg[cfgKey] = value - } - if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { - log.Warn("fail to update PD region merge config") + log.Warn("fail to switch to normal mode", zap.Error(err)) } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - scheduleLimitCfg[cfgKey] = value - } - if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - log.Warn("fail to update PD schedule config") + if err := restoreSchedulers(ctx); err != nil { + log.Warn("failed to restore PD schedulers", zap.Error(err)) } - if err := client.ResetRestoreLabels(ctx); err != nil { - log.Warn("reset store labels failed", zap.Error(err)) - } -} - -func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { - for _, scheduler := range removedSchedulers { - err := mgr.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - } - return nil } // RunRestoreTiflashReplica restores the replica of tiflash saved in the last restore. diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 6f269f8be..91665c1d6 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -110,11 +110,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } - removedSchedulers, err := restorePreWork(ctx, client, mgr) + restoreSchedulers, err := restorePreWork(ctx, client, mgr) if err != nil { return errors.Trace(err) } - defer restorePostWork(ctx, client, mgr, removedSchedulers) + defer restorePostWork(ctx, client, mgr, restoreSchedulers) err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { diff --git a/pkg/utils/pd.go b/pkg/utils/pd.go index 662996bdc..d836a30e5 100644 --- a/pkg/utils/pd.go +++ b/pkg/utils/pd.go @@ -4,6 +4,7 @@ package utils import ( "bytes" + "context" "crypto/tls" "encoding/hex" "encoding/json" @@ -18,6 +19,13 @@ import ( "github.com/pingcap/tidb/tablecodec" ) +// UndoFunc is a 'undo' operation of some undoable command. +// (e.g. RemoveSchedulers) +type UndoFunc func(context.Context) error + +// Nop is the 'zero value' of undo func. +var Nop UndoFunc = func(context.Context) error { return nil } + const ( resetTSURL = "/pd/api/v1/admin/reset-ts" placementRuleURL = "/pd/api/v1/config/rules" From d9218cf673a635c682d7f6d0f5435b25debfaf83 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 13 Jul 2020 19:00:35 +0800 Subject: [PATCH 2/7] task: add flag of remove scheduler --- pkg/conn/scheduler_utils.go | 4 +++- pkg/task/backup.go | 38 +++++++++++++++++++++++++++---------- pkg/task/backup_raw.go | 28 ++++++++++++++++++++++----- pkg/task/restore.go | 32 ++----------------------------- pkg/task/restore_raw.go | 2 +- pkg/utils/pd.go | 2 +- 6 files changed, 58 insertions(+), 48 deletions(-) diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go index 64b11c330..72e7bfbd3 100644 --- a/pkg/conn/scheduler_utils.go +++ b/pkg/conn/scheduler_utils.go @@ -6,8 +6,9 @@ import ( "context" "math" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" + + "github.com/pingcap/br/pkg/utils" ) // clusterConfig represents a set of scheduler whose config have been modified @@ -83,6 +84,7 @@ func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) } // RemoveSchedulers removes the schedulers that may slow down BR speed. +// TODO make each step returns a function that can restore schedulers it has removed. func RemoveSchedulers(ctx context.Context, mgr *Mgr) (utils.UndoFunc, error) { // Remove default PD scheduler that may affect restore process. existSchedulers, err := mgr.ListSchedulers(ctx) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b79f33d35..c9e4f19e4 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -28,10 +28,11 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupTS = "backupts" - flagLastBackupTS = "lastbackupts" - flagCompressionType = "compression" + flagBackupTimeago = "timeago" + flagBackupTS = "backupts" + flagLastBackupTS = "lastbackupts" + flagCompressionType = "compression" + flagRemoveSchedulers = "remove-schedulers" flagGCTTL = "gcttl" @@ -43,11 +44,12 @@ const ( type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` } // DefineBackupFlags defines common flags for the backup command. @@ -64,6 +66,9 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") flags.String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") + + flags.Bool(flagRemoveSchedulers, false, + "remove some of PD schedulers to speed up backup, but will make influence to cluster") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -113,7 +118,8 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if cfg.Config.Concurrency > maxBackupConcurrency { cfg.Config.Concurrency = maxBackupConcurrency } - return nil + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + return err } // RunBackup starts a backup task inside the current goroutine. @@ -162,6 +168,18 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 + if cfg.RemoveSchedulers { + restore, e := conn.RemoveSchedulers(ctx, mgr) + if e != nil { + return err + } + defer func() { + if restoreE := restore(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + } + req := kvproto.BackupRequest{ StartVersion: cfg.LastBackupTS, EndVersion: backupTS, diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 4d237848b..fb35072a4 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -8,8 +8,10 @@ import ( "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" "github.com/spf13/cobra" "github.com/spf13/pflag" + "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/conn" @@ -31,10 +33,11 @@ const ( type RawKvConfig struct { Config - StartKey []byte `json:"start-key" toml:"start-key"` - EndKey []byte `json:"end-key" toml:"end-key"` - CF string `json:"cf" toml:"cf"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` } // DefineRawBackupFlags defines common flags for the backup command. @@ -45,6 +48,8 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive") command.Flags().String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") + command.Flags().Bool(flagRemoveSchedulers, false, + "remove some of PD schedulers to speed up backup, but will make influence to cluster") } // ParseFromFlags parses the raw kv backup&restore common flags from the flag set. @@ -81,7 +86,8 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } - return nil + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + return err } // ParseBackupConfigFromFlags parses the backup-related flags from the flag set. @@ -129,6 +135,18 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} + if cfg.RemoveSchedulers { + restore, e := conn.RemoveSchedulers(ctx, mgr) + if e != nil { + return err + } + defer func() { + if restoreE := restore(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + } + // The number of regions need to backup approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey) if err != nil { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 72244370b..8cf4c4430 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -32,27 +32,6 @@ const ( defaultDDLConcurrency = 16 ) -var ( - schedulers = map[string]struct{}{ - "balance-leader-scheduler": {}, - "balance-hot-region-scheduler": {}, - "balance-region-scheduler": {}, - - "shuffle-leader-scheduler": {}, - "shuffle-region-scheduler": {}, - "shuffle-hot-region-scheduler": {}, - } - pdRegionMergeCfg = []string{ - "max-merge-region-keys", - "max-merge-region-size", - } - pdScheduleLimitCfg = []string{ - "leader-schedule-limit", - "region-schedule-limit", - "max-snapshot-count", - } -) - // RestoreConfig is the configuration specific for restore tasks. type RestoreConfig struct { Config @@ -224,7 +203,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers - defer restorePostWork(ctx, client, mgr, restoreSchedulers) + defer restorePostWork(ctx, client, restoreSchedulers) // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. @@ -337,13 +316,6 @@ func filterRestoreFiles( return } -type clusterConfig struct { - // Enable PD schedulers before restore - scheduler []string - // Original scheudle configuration - scheduleCfg map[string]interface{} -} - // restorePreWork executes some prepare work before restore. // TODO make this function returns a restore post work. func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (utils.UndoFunc, error) { @@ -362,7 +334,7 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) // restorePostWork executes some post work after restore. // TODO: aggregate all lifetime manage methods into batcher's context manager field. func restorePostWork( - ctx context.Context, client *restore.Client, mgr *conn.Mgr, restoreSchedulers utils.UndoFunc, + ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { if client.IsOnline() { return diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 91665c1d6..4e92ea2ad 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -114,7 +114,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR if err != nil { return errors.Trace(err) } - defer restorePostWork(ctx, client, mgr, restoreSchedulers) + defer restorePostWork(ctx, client, restoreSchedulers) err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { diff --git a/pkg/utils/pd.go b/pkg/utils/pd.go index d836a30e5..1aadbdbcc 100644 --- a/pkg/utils/pd.go +++ b/pkg/utils/pd.go @@ -20,7 +20,7 @@ import ( ) // UndoFunc is a 'undo' operation of some undoable command. -// (e.g. RemoveSchedulers) +// (e.g. RemoveSchedulers). type UndoFunc func(context.Context) error // Nop is the 'zero value' of undo func. From d6b37a0f806d11388e54fa034a22b0892609c813 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 13:48:04 +0800 Subject: [PATCH 3/7] tests: add tests of remove scheduler --- pkg/conn/scheduler_utils.go | 2 +- pkg/task/backup.go | 3 ++- pkg/task/backup_raw.go | 2 +- pkg/task/restore.go | 2 +- tests/br_other/run.sh | 16 ++++++++++++---- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go index 72e7bfbd3..b8b01c49d 100644 --- a/pkg/conn/scheduler_utils.go +++ b/pkg/conn/scheduler_utils.go @@ -85,7 +85,7 @@ func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) // RemoveSchedulers removes the schedulers that may slow down BR speed. // TODO make each step returns a function that can restore schedulers it has removed. -func RemoveSchedulers(ctx context.Context, mgr *Mgr) (utils.UndoFunc, error) { +func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (utils.UndoFunc, error) { // Remove default PD scheduler that may affect restore process. existSchedulers, err := mgr.ListSchedulers(ctx) if err != nil { diff --git a/pkg/task/backup.go b/pkg/task/backup.go index c9e4f19e4..f7baa28fb 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -169,7 +169,8 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 if cfg.RemoveSchedulers { - restore, e := conn.RemoveSchedulers(ctx, mgr) + log.Debug("removing some PD schedulers") + restore, e := mgr.RemoveSchedulers(ctx) if e != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index fb35072a4..7a085a203 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -136,7 +136,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} if cfg.RemoveSchedulers { - restore, e := conn.RemoveSchedulers(ctx, mgr) + restore, e := mgr.RemoveSchedulers(ctx) if e != nil { return err } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 8cf4c4430..142c85240 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -328,7 +328,7 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) return utils.Nop, nil } - return conn.RemoveSchedulers(ctx, mgr) + return mgr.RemoveSchedulers(ctx) } // restorePostWork executes some post work after restore. diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 9640a1600..1aa36e42e 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -58,10 +58,11 @@ fi echo "backup start to test lock file" PPROF_PORT=6080 GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \ -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & # record last backup pid _pid=$! + # give the former backup some time to write down lock file (and initialize signal listener). sleep 1 pkill -10 -P $_pid @@ -72,9 +73,11 @@ sleep 1 curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null echo "pprof started..." +curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' + backup_fail=0 echo "another backup start expect to fail due to last backup add a lockfile" -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 if [ "$backup_fail" -ne "1" ];then echo "TEST: [$TEST_NAME] test backup lock file failed!" exit 1 @@ -83,13 +86,18 @@ fi if ps -p $_pid > /dev/null then echo "$_pid is running" - # kill last backup progress - kill -9 $_pid + # kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.) + kill $_pid else echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished" exit 1 fi +# make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. +# give enough time to BR so it can gracefully stop. +sleep 10 +! curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' + run_sql "DROP DATABASE $DB;" # Test version From 00f4aa10fb92356c2f7eaab89ccb53083b2f7051 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 14:37:00 +0800 Subject: [PATCH 4/7] task: move remove-scheduler flag to backup config --- pkg/task/backup_raw.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 7a085a203..60ced0eb0 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -78,7 +78,6 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { if bytes.Compare(cfg.StartKey, cfg.EndKey) >= 0 { return errors.New("endKey must be greater than startKey") } - cfg.CF, err = flags.GetString(flagTiKVColumnFamily) if err != nil { return err @@ -86,8 +85,7 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } - cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) - return err + return nil } // ParseBackupConfigFromFlags parses the backup-related flags from the flag set. @@ -106,6 +104,10 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } cfg.CompressionType = compressionType + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + if err != nil { + return errors.Trace(err) + } return nil } From 306566b3b16bfd89bd48b15dce2429ac5bb4e1ac Mon Sep 17 00:00:00 2001 From: yujuncen Date: Fri, 17 Jul 2020 10:34:29 +0800 Subject: [PATCH 5/7] backup: return staged undo function. Signed-off-by: Hillium --- pkg/conn/scheduler_utils.go | 38 ++++++++++++++++++++----------------- pkg/task/backup.go | 2 +- pkg/task/backup_raw.go | 2 +- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go index b8b01c49d..31aeaf2eb 100644 --- a/pkg/conn/scheduler_utils.go +++ b/pkg/conn/scheduler_utils.go @@ -83,13 +83,21 @@ func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) return nil } +func (mgr *Mgr) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc { + restore := func(ctx context.Context) error { + return restoreSchedulers(ctx, mgr, config) + } + return restore +} + // RemoveSchedulers removes the schedulers that may slow down BR speed. -// TODO make each step returns a function that can restore schedulers it has removed. -func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (utils.UndoFunc, error) { +func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { + undo = utils.Nop + // Remove default PD scheduler that may affect restore process. existSchedulers, err := mgr.ListSchedulers(ctx) if err != nil { - return utils.Nop, nil + return } needRemoveSchedulers := make([]string, 0, len(existSchedulers)) for _, s := range existSchedulers { @@ -99,19 +107,22 @@ func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (utils.UndoFunc, error) { } scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) if err != nil { - return utils.Nop, nil + return } + undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler}) + stores, err := mgr.GetPDClient().GetAllStores(ctx) if err != nil { - return utils.Nop, err + return } - scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) if err != nil { - return utils.Nop, err + return } + undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg}) + disableMergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { value := scheduleCfg[cfgKey] @@ -124,7 +135,7 @@ func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (utils.UndoFunc, error) { } err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) if err != nil { - return utils.Nop, err + return } scheduleLimitCfg := make(map[string]interface{}) @@ -143,17 +154,10 @@ func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (utils.UndoFunc, error) { } err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) if err != nil { - return utils.Nop, err + return } - cluster := clusterConfig{ - scheduler: scheduler, - scheduleCfg: scheduleCfg, - } - restore := func(ctx context.Context) error { - return restoreSchedulers(ctx, mgr, cluster) - } - return restore, nil + return } func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) { diff --git a/pkg/task/backup.go b/pkg/task/backup.go index f7baa28fb..af47b70a7 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -68,7 +68,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") flags.Bool(flagRemoveSchedulers, false, - "remove some of PD schedulers to speed up backup, but will make influence to cluster") + "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") } // ParseFromFlags parses the backup-related flags from the flag set. diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 60ced0eb0..89e449d53 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -49,7 +49,7 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") command.Flags().Bool(flagRemoveSchedulers, false, - "remove some of PD schedulers to speed up backup, but will make influence to cluster") + "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") } // ParseFromFlags parses the raw kv backup&restore common flags from the flag set. From 892a7124dcbba2b4883048aa290b3b63186d03f7 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 17 Jul 2020 12:02:24 +0800 Subject: [PATCH 6/7] conn, backup: run undo anyway. Signed-off-by: Hillium --- pkg/conn/scheduler_utils.go | 7 +------ pkg/task/backup.go | 6 +++--- pkg/task/backup_raw.go | 6 +++--- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go index 31aeaf2eb..56b9e12b5 100644 --- a/pkg/conn/scheduler_utils.go +++ b/pkg/conn/scheduler_utils.go @@ -152,12 +152,7 @@ func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err limit := int(value.(float64)) scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) } - err = mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) - if err != nil { - return - } - - return + return undo, mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) } func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) { diff --git a/pkg/task/backup.go b/pkg/task/backup.go index af47b70a7..6d9bc2d2f 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -171,14 +171,14 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if cfg.RemoveSchedulers { log.Debug("removing some PD schedulers") restore, e := mgr.RemoveSchedulers(ctx) - if e != nil { - return err - } defer func() { if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } }() + if e != nil { + return err + } } req := kvproto.BackupRequest{ diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 89e449d53..19d55aa9c 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -139,14 +139,14 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if cfg.RemoveSchedulers { restore, e := mgr.RemoveSchedulers(ctx) - if e != nil { - return err - } defer func() { if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } }() + if e != nil { + return err + } } // The number of regions need to backup From 3fd8aa3a034365b73cb900f788c3aec3f5b0f5f9 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 21 Jul 2020 10:14:54 +0800 Subject: [PATCH 7/7] backup: hide remove schedulers flag Signed-off-by: Hillium --- pkg/task/backup.go | 2 ++ pkg/task/backup_raw.go | 2 ++ tests/br_other/run.sh | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 6d9bc2d2f..83a16f713 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -69,6 +69,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Bool(flagRemoveSchedulers, false, "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") + // This flag can impact the online cluster, so hide it in case of abuse. + _ = flags.MarkHidden(flagRemoveSchedulers) } // ParseFromFlags parses the backup-related flags from the flag set. diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 19d55aa9c..833477f90 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -50,6 +50,8 @@ func DefineRawBackupFlags(command *cobra.Command) { "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") command.Flags().Bool(flagRemoveSchedulers, false, "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") + // This flag can impact the online cluster, so hide it in case of abuse. + _ = command.Flags().MarkHidden(flagRemoveSchedulers) } // ParseFromFlags parses the raw kv backup&restore common flags from the flag set. diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 1aa36e42e..7d968610e 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -77,7 +77,7 @@ curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true' backup_fail=0 echo "another backup start expect to fail due to last backup add a lockfile" -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 if [ "$backup_fail" -ne "1" ];then echo "TEST: [$TEST_NAME] test backup lock file failed!" exit 1