Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
168 changes: 168 additions & 0 deletions pkg/conn/scheduler_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package conn

import (
"context"
"math"

"github.com/pingcap/errors"

"github.com/pingcap/br/pkg/utils"
)

// clusterConfig represents a set of scheduler whose config have been modified
// along with their original config.
type clusterConfig struct {
// Enable PD schedulers before restore
scheduler []string
// Original scheudle configuration
scheduleCfg map[string]interface{}
}

var (
schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
}
)

func addPDLeaderScheduler(ctx context.Context, mgr *Mgr, removedSchedulers []string) error {
for _, scheduler := range removedSchedulers {
err := mgr.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
}
return nil
}

func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) error {
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
mergeCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return errors.Annotate(err, "fail to update PD schedule config")
}
return nil
}

func (mgr *Mgr) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc {
restore := func(ctx context.Context) error {
return restoreSchedulers(ctx, mgr, config)
}
return restore
}

// RemoveSchedulers removes the schedulers that may slow down BR speed.
func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) {
undo = utils.Nop

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := mgr.ListSchedulers(ctx)
if err != nil {
return
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
if err != nil {
return
}

undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler})

stores, err := mgr.GetPDClient().GetAllStores(ctx)
if err != nil {
return
}
scheduleCfg, err := mgr.GetPDScheduleConfig(ctx)
if err != nil {
return
}

undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg})

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
}
err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg)
if err != nil {
return
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
return undo, mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg)
}

func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) {
removedSchedulers := make([]string, 0, len(existSchedulers))
for _, scheduler := range existSchedulers {
err := mgr.RemoveScheduler(ctx, scheduler)
if err != nil {
return nil, err
}
removedSchedulers = append(removedSchedulers, scheduler)
}
return removedSchedulers, nil
}
41 changes: 31 additions & 10 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"
flagRemoveSchedulers = "remove-schedulers"

flagGCTTL = "gcttl"

Expand All @@ -43,11 +44,12 @@ const (
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -64,6 +66,11 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")

flags.Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should hide it or let user know this will impact on online cluster.

// This flag can impact the online cluster, so hide it in case of abuse.
_ = flags.MarkHidden(flagRemoveSchedulers)
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -113,7 +120,8 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.Config.Concurrency > maxBackupConcurrency {
cfg.Config.Concurrency = maxBackupConcurrency
}
return nil
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
return err
}

// RunBackup starts a backup task inside the current goroutine.
Expand Down Expand Up @@ -162,6 +170,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig

isIncrementalBackup := cfg.LastBackupTS > 0

if cfg.RemoveSchedulers {
log.Debug("removing some PD schedulers")
restore, e := mgr.RemoveSchedulers(ctx)
defer func() {
if restoreE := restore(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}()
if e != nil {
return err
}
}

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
Expand Down
32 changes: 27 additions & 5 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
Expand All @@ -31,10 +33,11 @@ const (
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
}

// DefineRawBackupFlags defines common flags for the backup command.
Expand All @@ -45,6 +48,10 @@ func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive")
command.Flags().String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
command.Flags().Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
// This flag can impact the online cluster, so hide it in case of abuse.
_ = command.Flags().MarkHidden(flagRemoveSchedulers)
}

// ParseFromFlags parses the raw kv backup&restore common flags from the flag set.
Expand Down Expand Up @@ -73,7 +80,6 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if bytes.Compare(cfg.StartKey, cfg.EndKey) >= 0 {
return errors.New("endKey must be greater than startKey")
}

cfg.CF, err = flags.GetString(flagTiKVColumnFamily)
if err != nil {
return err
Expand All @@ -100,6 +106,10 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -129,6 +139,18 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf

backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey}

if cfg.RemoveSchedulers {
restore, e := mgr.RemoveSchedulers(ctx)
defer func() {
if restoreE := restore(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}()
if e != nil {
return err
}
}

// The number of regions need to backup
approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey)
if err != nil {
Expand Down
Loading