diff --git a/go.mod b/go.mod index 263dea93a..418ceca68 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,11 @@ require ( github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c - github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee + github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200730093003-dc8c75cf7ca0 - github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671 + github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d github.com/pingcap/tidb-tools v4.0.1+incompatible github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 github.com/prometheus/client_golang v1.5.1 diff --git a/go.sum b/go.sum index d4aa48e44..65a06664a 100644 --- a/go.sum +++ b/go.sum @@ -420,6 +420,8 @@ github.com/ngaut/unistore v0.0.0-20200604061006-d8e9dc0ad154/go.mod h1:YGQzxn9cV github.com/ngaut/unistore v0.0.0-20200630072006-0c4035925f69/go.mod h1:Hxlp5VAoPOHwcXLUw/E+P3XjJX1EP38NWjXPpc4nuOE= github.com/ngaut/unistore v0.0.0-20200803051709-607d96233b1d h1:5wZOROIRh9/E4kyX6pXT7OtYyQtS9iUNcQtyt0Agil4= github.com/ngaut/unistore v0.0.0-20200803051709-607d96233b1d/go.mod h1:2QAH8tXCjeHuCSLEWKLYAzHPz2dB59VnhpPA2IDVeW4= +github.com/ngaut/unistore v0.0.0-20200806113332-5b9f73333a19 h1:N8pW0PTJEGTyHzZuN7sofxVsFmuvR+vFD0BNJ243k2o= +github.com/ngaut/unistore v0.0.0-20200806113332-5b9f73333a19/go.mod h1:RtZJKyiaHRiII+b9/g/4339rSikSvfrUJmIbrUkYVi4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -466,6 +468,7 @@ github.com/pingcap/br v0.0.0-20200610051721-b057d65ff579/go.mod h1:Gq6o66nDReG0f github.com/pingcap/br v0.0.0-20200617120402-56e151ad8b67/go.mod h1:/3QzpDG7YTPrDsrg8i1lwdYUrplJ0jVD+9pxhh19+k4= github.com/pingcap/br v0.0.0-20200727092753-a475692725db/go.mod h1:4iTqZAMbEPmjBggYixqIg2FwIHBQtyImTM/QYlpTBGk= github.com/pingcap/br v0.0.0-20200803052654-e6f63fc1807a/go.mod h1:8j7vGUfHCETYbeBfASLTDywC3NFSx90z9nuk0PV9rpo= +github.com/pingcap/br v0.0.0-20200805121136-181c081ba6ac/go.mod h1:9P24mNzNmXjggYBm4pnb08slSbua8FA6QIyg68GpuhQ= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0= @@ -505,8 +508,9 @@ github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200701055533-4ef28cac01f8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee h1:DVIvoPsg68XMQje9ivq7l8Iz8sTCBZGrHeaSwg+mwjw= github.com/pingcap/kvproto v0.0.0-20200715040832-c3e2e0b163ee/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f h1:y247vlXfpe8MYGt6pkVKqIoYmx2KoeF8jmBLbAlse6Y= +github.com/pingcap/kvproto v0.0.0-20200803054707-ebd5de15093f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= @@ -545,10 +549,11 @@ github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098/go.mod h1:UMxs github.com/pingcap/tidb v1.1.0-beta.0.20200610060912-f12cdc42010f/go.mod h1:jyXOvS9k0PTxYHju2OgySOe9FtydA52TiQ5bXAaKyQE= github.com/pingcap/tidb v1.1.0-beta.0.20200721005019-f5c6e59f0daf/go.mod h1:dYCOFXJsoqBumpxAyBqCG3WZriIY7JgeBZHgvfARDa8= github.com/pingcap/tidb v1.1.0-beta.0.20200803035726-41c23700d8d1/go.mod h1:YFuuPMuceYoXIr4sCrtv1FUyJLvtYp9KisDsTk5dxlE= -github.com/pingcap/tidb v1.1.0-beta.0.20200803051932-e291f8fbd1e0 h1:Ev/VdmPfujNcagxqe79cvbd1RlGJrDcPXGgA/IO6/0k= github.com/pingcap/tidb v1.1.0-beta.0.20200803051932-e291f8fbd1e0/go.mod h1:YFuuPMuceYoXIr4sCrtv1FUyJLvtYp9KisDsTk5dxlE= -github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671 h1:COnAO+eDmGdFtOSDGoI89fiF5KV326bqT8MpwNxG5hk= github.com/pingcap/tidb v1.1.0-beta.0.20200805053026-cd3e5ed82671/go.mod h1:+r9tlyUKG2zYzs2ajvEHiQlTx6WM0K2L1yabCHZwgGw= +github.com/pingcap/tidb v1.1.0-beta.0.20200806060043-574540aa06ba/go.mod h1:NHcZH46dkYwDd2IWUJaLOB0m54j7v2P5WdS4FvPR81w= +github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d h1:01jQWk7xGtZCQWmu1d0uaVO6gq+nDbSC2YjMWdlBnIA= +github.com/pingcap/tidb v1.1.0-beta.0.20200810064414-d81150394f9d/go.mod h1:vLYo4E7Q6kzKYTskhP2MHBsodmZIRRUU63qdiFjlULA= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 718451e22..6bfa57827 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -507,7 +507,7 @@ func (bc *Client) BackupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. err = bc.fineGrainedBackup( - ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, + ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel, req.RateLimit, req.Concurrency, results, updateCh) if err != nil { return nil, err @@ -569,6 +569,7 @@ func (bc *Client) fineGrainedBackup( lastBackupTS uint64, backupTS uint64, compressType kvproto.CompressionType, + compressLevel int32, rateLimit uint64, concurrency uint32, rangeTree rtree.RangeTree, @@ -599,7 +600,8 @@ func (bc *Client) fineGrainedBackup( defer wg.Done() for rg := range retry { backoffMs, err := - bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh) + bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, + compressType, compressLevel, rateLimit, concurrency, respCh) if err != nil { errCh <- err return @@ -734,6 +736,7 @@ func (bc *Client) handleFineGrained( lastBackupTS uint64, backupTS uint64, compressType kvproto.CompressionType, + compressionLevel int32, rateLimit uint64, concurrency uint32, respCh chan<- *kvproto.BackupResponse, @@ -746,15 +749,16 @@ func (bc *Client) handleFineGrained( max := 0 req := kvproto.BackupRequest{ - ClusterId: bc.clusterID, - StartKey: rg.StartKey, // TODO: the range may cross region. - EndKey: rg.EndKey, - StartVersion: lastBackupTS, - EndVersion: backupTS, - StorageBackend: bc.backend, - RateLimit: rateLimit, - Concurrency: concurrency, - CompressionType: compressType, + ClusterId: bc.clusterID, + StartKey: rg.StartKey, // TODO: the range may cross region. + EndKey: rg.EndKey, + StartVersion: lastBackupTS, + EndVersion: backupTS, + StorageBackend: bc.backend, + RateLimit: rateLimit, + Concurrency: concurrency, + CompressionType: compressType, + CompressionLevel: compressionLevel, } lockResolver := bc.mgr.GetLockResolver() client, err := bc.mgr.GetBackupClient(ctx, storeID) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b33a0f5d3..7083c229a 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -32,6 +32,7 @@ const ( flagBackupTS = "backupts" flagLastBackupTS = "lastbackupts" flagCompressionType = "compression" + flagCompressionLevel = "compression-level" flagRemoveSchedulers = "remove-schedulers" flagGCTTL = "gcttl" @@ -40,16 +41,22 @@ const ( maxBackupConcurrency = 256 ) +// CompressionConfig is the configuration for sst file compression. +type CompressionConfig struct { + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` + CompressionLevel int32 `json:"compression-level" toml:"compression-level"` +} + // BackupConfig is the configuration specific for backup tasks. type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` - RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + CompressionConfig } // DefineBackupFlags defines common flags for the backup command. @@ -66,6 +73,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") flags.String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") + flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression") flags.Bool(flagRemoveSchedulers, false, "disable the balance, shuffle and region-merge schedulers in PD to speed up backup") @@ -101,15 +109,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.GCTTL = gcTTL - compressionStr, err := flags.GetString(flagCompressionType) + compressionCfg, err := parseCompressionFlags(flags) if err != nil { return errors.Trace(err) } - compressionType, err := parseCompressionType(compressionStr) - if err != nil { - return errors.Trace(err) - } - cfg.CompressionType = compressionType + cfg.CompressionConfig = *compressionCfg if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) @@ -119,6 +123,26 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { return err } +// ParseFromFlags parses the backup-related flags from the flag set. +func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) { + compressionStr, err := flags.GetString(flagCompressionType) + if err != nil { + return nil, errors.Trace(err) + } + compressionType, err := parseCompressionType(compressionStr) + if err != nil { + return nil, errors.Trace(err) + } + level, err := flags.GetInt32(flagCompressionLevel) + if err != nil { + return nil, errors.Trace(err) + } + return &CompressionConfig{ + CompressionLevel: level, + CompressionType: compressionType, + }, nil +} + // adjustBackupConfig is use for BR(binary) and BR in TiDB. // When new config was add and not included in parser. // we should set proper value in this function. @@ -206,11 +230,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } req := kvproto.BackupRequest{ - StartVersion: cfg.LastBackupTS, - EndVersion: backupTS, - RateLimit: cfg.RateLimit, - Concurrency: defaultBackupConcurrency, - CompressionType: cfg.CompressionType, + StartVersion: cfg.LastBackupTS, + EndVersion: backupTS, + RateLimit: cfg.RateLimit, + Concurrency: defaultBackupConcurrency, + CompressionType: cfg.CompressionType, + CompressionLevel: cfg.CompressionLevel, } ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 833477f90..20f8f9ed2 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -33,11 +33,11 @@ const ( type RawKvConfig struct { Config - StartKey []byte `json:"start-key" toml:"start-key"` - EndKey []byte `json:"end-key" toml:"end-key"` - CF string `json:"cf" toml:"cf"` - CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` - RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` + CompressionConfig + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` } // DefineRawBackupFlags defines common flags for the backup command. @@ -97,19 +97,22 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { return err } - compressionStr, err := flags.GetString(flagCompressionType) + compressionCfg, err := parseCompressionFlags(flags) if err != nil { return errors.Trace(err) } - compressionType, err := parseCompressionType(compressionStr) + cfg.CompressionConfig = *compressionCfg + + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) if err != nil { return errors.Trace(err) } - cfg.CompressionType = compressionType - cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) + level, err := flags.GetInt32(flagCompressionLevel) if err != nil { return errors.Trace(err) } + cfg.CompressionLevel = level + return nil } @@ -165,13 +168,14 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ - StartVersion: 0, - EndVersion: 0, - RateLimit: cfg.RateLimit, - Concurrency: cfg.Concurrency, - IsRawKv: true, - Cf: cfg.CF, - CompressionType: cfg.CompressionType, + StartVersion: 0, + EndVersion: 0, + RateLimit: cfg.RateLimit, + Concurrency: cfg.Concurrency, + IsRawKv: true, + Cf: cfg.CF, + CompressionType: cfg.CompressionType, + CompressionLevel: cfg.CompressionLevel, } files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) if err != nil { diff --git a/tests/br_full/run.sh b/tests/br_full/run.sh index 6e5fe44a4..6100c980d 100755 --- a/tests/br_full/run.sh +++ b/tests/br_full/run.sh @@ -28,36 +28,48 @@ for i in $(seq $DB_COUNT); do done # backup full -echo "backup start..." -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 +echo "backup with lz4 start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --ratelimit 5 --concurrency 4 --compression lz4 +size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}') -for i in $(seq $DB_COUNT); do - run_sql "DROP DATABASE $DB${i};" -done +echo "backup with zstd start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --ratelimit 5 --concurrency 4 --compression zstd --compression-level 6 +size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}') -# restore full -echo "restore start..." -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 +if [ "$size_lz4" -le "$size_zstd" ]; then + echo "full backup lz4 size $size_lz4 is small than backup with zstd $size_zstd" + exit -1 +fi -for i in $(seq $DB_COUNT); do - row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') -done +for ct in lz4 zstd; do + for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i};" + done -fail=false -for i in $(seq $DB_COUNT); do - if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then - fail=true - echo "TEST: [$TEST_NAME] fail on database $DB${i}" - fi - echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" -done + # restore full + echo "restore with $ct backup start..." + run_br restore full -s "local://$TEST_DIR/$DB-$ct" --pd $PD_ADDR --ratelimit 1024 -if $fail; then - echo "TEST: [$TEST_NAME] failed!" - exit 1 -else - echo "TEST: [$TEST_NAME] successed!" -fi + for i in $(seq $DB_COUNT); do + row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') + done + + fail=false + for i in $(seq $DB_COUNT); do + if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then + fail=true + echo "TEST: [$TEST_NAME] fail on database $DB${i}" + fi + echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" + done + + if $fail; then + echo "TEST: [$TEST_NAME] failed!" + exit 1 + else + echo "TEST: [$TEST_NAME] successed!" + fi +done for i in $(seq $DB_COUNT); do run_sql "DROP DATABASE $DB${i};"