Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ require (
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c
github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/tidb v1.1.0-beta.0.20200716023258-b10faca6ff89
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible
github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce
github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200730093003-dc8c75cf7ca0
github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.6.0
Expand Down
134 changes: 117 additions & 17 deletions go.sum

Large diffs are not rendered by default.

26 changes: 15 additions & 11 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (bc *Client) BackupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType,
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return nil, err
Expand Down Expand Up @@ -569,6 +569,7 @@ func (bc *Client) fineGrainedBackup(
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
compressLevel int32,
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
Expand Down Expand Up @@ -599,7 +600,8 @@ func (bc *Client) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh)
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -734,6 +736,7 @@ func (bc *Client) handleFineGrained(
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
respCh chan<- *kvproto.BackupResponse,
Expand All @@ -746,15 +749,16 @@ func (bc *Client) handleFineGrained(
max := 0

req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
CompressionLevel: compressionLevel,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestT(t *testing.T) {
}

func (r *testBackup) SetUpSuite(c *C) {
r.mockPDClient = mocktikv.NewPDClient(mocktikv.NewCluster())
store := mocktikv.MustNewMVCCStore()
r.mockPDClient = mocktikv.NewPDClient(mocktikv.NewCluster(store))
r.ctx, r.cancel = context.WithCancel(context.Background())
mockMgr := &conn.Mgr{}
mockMgr.SetPDClient(r.mockPDClient)
Expand Down
7 changes: 2 additions & 5 deletions pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ func NewCluster() (*Cluster, error) {
}()
})

cluster := mocktikv.NewCluster()
client, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
mocktikv.BootstrapWithSingleStore(cluster)
mvccStore := mocktikv.MustNewMVCCStore()

client, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
if err != nil {
return nil, err
}
Expand All @@ -76,7 +73,7 @@ func NewCluster() (*Cluster, error) {
}
return &Cluster{
Cluster: cluster,
MVCCStore: mvccStore,
MVCCStore: client.MvccStore,
Storage: storage,
Domain: dom,
PDClient: pdClient,
Expand Down
35 changes: 28 additions & 7 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,35 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);",
utils.EncloseName(table.Db.Name.O),
utils.EncloseName(table.Info.Name.O))
var setValSQL string
if increment < 0 {
restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue)
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue)
} else {
restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue)
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue)
}
err = db.se.Execute(ctx, setValSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", setValSQL),
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}

// trigger cycle round > 0
restoreMetaSQL += nextSeqSQL
restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.AutoIncID)
} else {
restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID)
err = db.se.Execute(ctx, nextSeqSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", nextSeqSQL),
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
}
restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID)
err = db.se.Execute(ctx, restoreMetaSQL)
} else {
var alterAutoIncIDFormat string
switch {
Expand All @@ -146,15 +164,18 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
utils.EncloseName(table.Db.Name.O),
utils.EncloseName(table.Info.Name.O),
table.Info.AutoIncID)
if utils.NeedAutoID(table.Info) {
err = db.se.Execute(ctx, restoreMetaSQL)
}
}

err = db.se.Execute(ctx, restoreMetaSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", restoreMetaSQL),
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() {
// this table has auto random id, we need rebase it
Expand Down
8 changes: 4 additions & 4 deletions pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,15 @@ func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetO
}

func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
regions, leaders, err := c.client.ScanRegions(ctx, key, endKey, limit)
regions, err := c.client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, err
}
regionInfos := make([]*RegionInfo, 0, len(regions))
for i := range regions {
for _, r := range regions {
regionInfos = append(regionInfos, &RegionInfo{
Region: regions[i],
Leader: leaders[i],
Region: r.Meta,
Leader: r.Leader,
})
}
return regionInfos, nil
Expand Down
59 changes: 42 additions & 17 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"
flagCompressionLevel = "compression-level"
flagRemoveSchedulers = "remove-schedulers"

flagGCTTL = "gcttl"
Expand All @@ -40,16 +41,22 @@ const (
maxBackupConcurrency = 256
)

// CompressionConfig is the configuration for sst file compression.
type CompressionConfig struct {
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
CompressionLevel int32 `json:"compression-level" toml:"compression-level"`
}

// BackupConfig is the configuration specific for backup tasks.
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
CompressionConfig
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -66,6 +73,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression")

flags.Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
Expand Down Expand Up @@ -101,15 +109,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.GCTTL = gcTTL

compressionStr, err := flags.GetString(flagCompressionType)
compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
cfg.CompressionConfig = *compressionCfg

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
Expand All @@ -119,6 +123,26 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return err
}

// ParseFromFlags parses the backup-related flags from the flag set.
func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return nil, errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return nil, errors.Trace(err)
}
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return nil, errors.Trace(err)
}
return &CompressionConfig{
CompressionLevel: level,
CompressionType: compressionType,
}, nil
}

// adjustBackupConfig is use for BR(binary) and BR in TiDB.
// When new config was add and not included in parser.
// we should set proper value in this function.
Expand Down Expand Up @@ -206,11 +230,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
}

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
Expand Down
36 changes: 20 additions & 16 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ const (
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionConfig
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
}

// DefineRawBackupFlags defines common flags for the backup command.
Expand Down Expand Up @@ -97,19 +97,22 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
return err
}

compressionStr, err := flags.GetString(flagCompressionType)
compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
cfg.CompressionConfig = *compressionCfg

cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionLevel = level

return nil
}

Expand Down Expand Up @@ -165,13 +168,14 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
}
files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// NeedAutoID checks whether the table needs backing up with an autoid.
func NeedAutoID(tblInfo *model.TableInfo) bool {
hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
return hasRowID || hasAutoIncID
}

// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Expand Down
Loading