Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c
github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee
github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200730093003-dc8c75cf7ca0
github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671
github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/ngaut/unistore v0.0.0-20200604061006-d8e9dc0ad154/go.mod h1:YGQzxn9cV
github.com/ngaut/unistore v0.0.0-20200630072006-0c4035925f69/go.mod h1:Hxlp5VAoPOHwcXLUw/E+P3XjJX1EP38NWjXPpc4nuOE=
github.com/ngaut/unistore v0.0.0-20200803051709-607d96233b1d h1:5wZOROIRh9/E4kyX6pXT7OtYyQtS9iUNcQtyt0Agil4=
github.com/ngaut/unistore v0.0.0-20200803051709-607d96233b1d/go.mod h1:2QAH8tXCjeHuCSLEWKLYAzHPz2dB59VnhpPA2IDVeW4=
github.com/ngaut/unistore v0.0.0-20200806113332-5b9f73333a19 h1:N8pW0PTJEGTyHzZuN7sofxVsFmuvR+vFD0BNJ243k2o=
github.com/ngaut/unistore v0.0.0-20200806113332-5b9f73333a19/go.mod h1:RtZJKyiaHRiII+b9/g/4339rSikSvfrUJmIbrUkYVi4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
Expand Down Expand Up @@ -466,6 +468,7 @@ github.com/pingcap/br v0.0.0-20200610051721-b057d65ff579/go.mod h1:Gq6o66nDReG0f
github.com/pingcap/br v0.0.0-20200617120402-56e151ad8b67/go.mod h1:/3QzpDG7YTPrDsrg8i1lwdYUrplJ0jVD+9pxhh19+k4=
github.com/pingcap/br v0.0.0-20200727092753-a475692725db/go.mod h1:4iTqZAMbEPmjBggYixqIg2FwIHBQtyImTM/QYlpTBGk=
github.com/pingcap/br v0.0.0-20200803052654-e6f63fc1807a/go.mod h1:8j7vGUfHCETYbeBfASLTDywC3NFSx90z9nuk0PV9rpo=
github.com/pingcap/br v0.0.0-20200805121136-181c081ba6ac/go.mod h1:9P24mNzNmXjggYBm4pnb08slSbua8FA6QIyg68GpuhQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0=
Expand Down Expand Up @@ -505,8 +508,9 @@ github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200701055533-4ef28cac01f8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee h1:DVIvoPsg68XMQje9ivq7l8Iz8sTCBZGrHeaSwg+mwjw=
github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f h1:y247vlXfpe8MYGt6pkVKqIoYmx2KoeF8jmBLbAlse6Y=
github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down Expand Up @@ -545,10 +549,11 @@ github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098/go.mod h1:UMxs
github.com/pingcap/tidb v1.1.0-beta.0.20200610060912-f12cdc42010f/go.mod h1:jyXOvS9k0PTxYHju2OgySOe9FtydA52TiQ5bXAaKyQE=
github.com/pingcap/tidb v1.1.0-beta.0.20200721005019-f5c6e59f0daf/go.mod h1:dYCOFXJsoqBumpxAyBqCG3WZriIY7JgeBZHgvfARDa8=
github.com/pingcap/tidb v1.1.0-beta.0.20200803035726-41c23700d8d1/go.mod h1:YFuuPMuceYoXIr4sCrtv1FUyJLvtYp9KisDsTk5dxlE=
github.com/pingcap/tidb v1.1.0-beta.0.20200803051932-e291f8fbd1e0 h1:Ev/VdmPfujNcagxqe79cvbd1RlGJrDcPXGgA/IO6/0k=
github.com/pingcap/tidb v1.1.0-beta.0.20200803051932-e291f8fbd1e0/go.mod h1:YFuuPMuceYoXIr4sCrtv1FUyJLvtYp9KisDsTk5dxlE=
github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671 h1:COnAO+eDmGdFtOSDGoI89fiF5KV326bqT8MpwNxG5hk=
github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671/go.mod h1:+r9tlyUKG2zYzs2ajvEHiQlTx6WM0K2L1yabCHZwgGw=
github.com/pingcap/tidb v1.1.0-beta.0.20200806060043-574540aa06ba/go.mod h1:NHcZH46dkYwDd2IWUJaLOB0m54j7v2P5WdS4FvPR81w=
github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d h1:01jQWk7xGtZCQWmu1d0uaVO6gq+nDbSC2YjMWdlBnIA=
github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d/go.mod h1:vLYo4E7Q6kzKYTskhP2MHBsodmZIRRUU63qdiFjlULA=
github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
Expand Down
26 changes: 15 additions & 11 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (bc *Client) BackupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType,
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return nil, err
Expand Down Expand Up @@ -569,6 +569,7 @@ func (bc *Client) fineGrainedBackup(
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
compressLevel int32,
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
Expand Down Expand Up @@ -599,7 +600,8 @@ func (bc *Client) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh)
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -734,6 +736,7 @@ func (bc *Client) handleFineGrained(
lastBackupTS uint64,
backupTS uint64,
compressType kvproto.CompressionType,
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
respCh chan<- *kvproto.BackupResponse,
Expand All @@ -746,15 +749,16 @@ func (bc *Client) handleFineGrained(
max := 0

req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
CompressionType: compressType,
CompressionLevel: compressionLevel,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
59 changes: 42 additions & 17 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"
flagCompressionLevel = "compression-level"
flagRemoveSchedulers = "remove-schedulers"

flagGCTTL = "gcttl"
Expand All @@ -40,16 +41,22 @@ const (
maxBackupConcurrency = 256
)

// CompressionConfig is the configuration for sst file compression.
type CompressionConfig struct {
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
CompressionLevel int32 `json:"compression-level" toml:"compression-level"`
}

// BackupConfig is the configuration specific for backup tasks.
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
CompressionConfig
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -66,6 +73,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression")

flags.Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
Expand Down Expand Up @@ -101,15 +109,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.GCTTL = gcTTL

compressionStr, err := flags.GetString(flagCompressionType)
compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
cfg.CompressionConfig = *compressionCfg

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
Expand All @@ -119,6 +123,26 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return err
}

// ParseFromFlags parses the backup-related flags from the flag set.
func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return nil, errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return nil, errors.Trace(err)
}
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return nil, errors.Trace(err)
}
return &CompressionConfig{
CompressionLevel: level,
CompressionType: compressionType,
}, nil
}

// adjustBackupConfig is use for BR(binary) and BR in TiDB.
// When new config was add and not included in parser.
// we should set proper value in this function.
Expand Down Expand Up @@ -206,11 +230,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
}

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
Expand Down
36 changes: 20 additions & 16 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ const (
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionConfig
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
}

// DefineRawBackupFlags defines common flags for the backup command.
Expand Down Expand Up @@ -97,19 +97,22 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
return err
}

compressionStr, err := flags.GetString(flagCompressionType)
compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
cfg.CompressionConfig = *compressionCfg

cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionType = compressionType
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionLevel = level

return nil
}

Expand Down Expand Up @@ -165,13 +168,14 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
StartVersion: 0,
EndVersion: 0,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
}
files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh)
if err != nil {
Expand Down
62 changes: 37 additions & 25 deletions tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,48 @@ for i in $(seq $DB_COUNT); do
done

# backup full
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4
echo "backup with lz4 start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --ratelimit 5 --concurrency 4 --compression lz4
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')

for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
echo "backup with zstd start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --ratelimit 5 --concurrency 4 --compression zstd --compression-level 6
size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}')

# restore full
echo "restore start..."
run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024
if [ "$size_lz4" -le "$size_zstd" ]; then
echo "full backup lz4 size $size_lz4 is small than backup with zstd $size_zstd"
exit -1
fi

for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done
for ct in lz4 zstd; do
for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done

fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done
# restore full
echo "restore with $ct backup start..."
run_br restore full -s "local://$TEST_DIR/$DB-$ct" --pd $PD_ADDR --ratelimit 1024

if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
else
echo "TEST: [$TEST_NAME] successed!"
fi
for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done

fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done

if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
else
echo "TEST: [$TEST_NAME] successed!"
fi
done

for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
Expand Down