diff --git a/cmd/backup.go b/cmd/backup.go index 971d4ef08..948249671 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -30,7 +30,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error { func runBackupRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil { command.SilenceUsage = false return err } diff --git a/cmd/cmd.go b/cmd/cmd.go index 689f69453..ddf1d9096 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -25,7 +25,7 @@ var ( initOnce = sync.Once{} defaultContext context.Context hasLogFile uint64 - tidbGlue = gluetidb.Glue{} + tidbGlue = gluetidb.New() envLogToTermKey = "BR_LOG_TO_TERM" ) diff --git a/go.mod b/go.mod index 4c8e237d0..9e4aaa0f8 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,13 @@ require ( github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 github.com/pingcap/failpoint v0.0.0-20200603062251-b230c36c413c - github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 + github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c github.com/pingcap/log v0.0.0-20200511115504-543df19646ad - github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f + github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 - github.com/pingcap/tidb v1.1.0-beta.0.20200606093724-b5b4da0e6a90 + github.com/pingcap/tidb v1.1.0-beta.0.20200715100003-b4da443a3c4c github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible - github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee + github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce github.com/prometheus/client_golang v1.5.1 github.com/prometheus/common v0.9.1 github.com/sirupsen/logrus v1.6.0 diff --git a/go.sum b/go.sum index c5f9bf39c..56f9ff90c 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= +github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= @@ -290,6 +292,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hypnoglow/gormzap v0.3.0/go.mod h1:5Wom8B7Jl2oK0Im9hs6KQ+Kl92w4Y7gKCrj66rhyvw0= +github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 h1:VHgatEHNcBFEB7inlalqfNqw65aNkM1lGX2yt3NmbS8= +github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -427,6 +431,7 @@ github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200407064406-b2b8ad403d01/g github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200514075710-eecc9a4525b5/go.mod h1:8q+yDx0STBPri8xS4A2duS1dAf+xO0cMtjwe0t6MWJk= github.com/pingcap/br v0.0.0-20200426093517-dd11ae28b885/go.mod h1:4w3meMnk7HDNpNgjuRAxavruTeKJvUiXxoEWTjzXPnA= github.com/pingcap/br v0.0.0-20200521085655-53201addd4ad/go.mod h1:SlSUHWY7QUoooiYxOKuJ8kUh2KjI29ogBh89YXz2dLA= +github.com/pingcap/br v0.0.0-20200623060633-439a1c2b2bfd/go.mod h1:NGee2H9vXLunFIBXGb3uFsWRpw3BBo822sY4dyXepqo= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0= @@ -462,6 +467,8 @@ github.com/pingcap/kvproto v0.0.0-20200423020121-038e31959c2a/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c h1:VnLpCAxMAeDxc7HXTetwDQB+/MtDQjHAOBsd4QnGVwA= +github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= @@ -473,6 +480,8 @@ github.com/pingcap/parser v0.0.0-20200424075042-8222d8b724a4/go.mod h1:9v0Edh8Ib github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f h1:tdjuYfon3J2Wae1g6ya72No+x+329SiWoENoVIS/HJY= github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= +github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb h1:v9iX5qIr8nG3QxMtlcTT+1DI0YD4HqABy7tuohbp28E= +github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2 h1:JTzYYukREvxVSKW/ncrzNjFitd8snoQ/Xz32pw8i+s8= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 h1:FM+PzdoR3fmWAJx3ug+p5aOgs5aZYwFkoDL7Potdsz0= @@ -485,6 +494,8 @@ github.com/pingcap/tidb v1.1.0-beta.0.20200424154252-5ede18f10eed/go.mod h1:m2VD github.com/pingcap/tidb v1.1.0-beta.0.20200509133407-a9dc72cf2558/go.mod h1:cXNbVSQAkwwmjFQmEnEPI00Z2/Y/KOhouttUPERiInE= github.com/pingcap/tidb v1.1.0-beta.0.20200606093724-b5b4da0e6a90 h1:fH0MbXxR4HbEyDmVxDqZmw0ouV5kFGWb3EXQGPX6Tns= github.com/pingcap/tidb v1.1.0-beta.0.20200606093724-b5b4da0e6a90/go.mod h1:aaBBi3OJmYjENWY31YYOY8K6UoZZYgjZVZH56D0QIdE= +github.com/pingcap/tidb v1.1.0-beta.0.20200715100003-b4da443a3c4c h1:nLnjPQ4cx5Dm0QWq9Nt9A+NG5F3Abq87VrJ8JY6pZvg= +github.com/pingcap/tidb v1.1.0-beta.0.20200715100003-b4da443a3c4c/go.mod h1:TplKBs1sevRvK11aT7ro0ntTCalyh1fMaWACp03dQf4= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible h1:84F7MFMfdAYObrznvRslmVu43aoihrlL+7mMyMlOi0o= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible h1:+K5bqDYG5HT+GqLdx4GH5VmS84+xHgpHbGg6Xt6qQec= @@ -496,6 +507,8 @@ github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompat github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee h1:XJQ6/LGzOSc/jo33AD8t7jtc4GohxcyODsYnb+kZXJM= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce h1:LDyY6Xh/Z/SHVQ10erWtoOwIxHSTtlpPQ9cvS+BfRMY= +github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -952,6 +965,8 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3 h1:sXmLre5bzIR6ypkjXCDI3jHPssRhc8KD/Ome589sc3U= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8= +honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index c05bfb8a2..cb1d4dbf1 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -406,13 +406,14 @@ func (bc *Client) BackupRanges( allFilesCollected := make(chan struct{}, 1) go func() { init := time.Now() - start, cur := init, init + // nolint:ineffassign + lastBackupStart, currentBackupStart := init, init for files := range filesCh { - cur, start = start, time.Now() + lastBackupStart, currentBackupStart = currentBackupStart, time.Now() allFiles = append(allFiles, files...) - summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start)) + summary.CollectSuccessUnit("backup ranges", 1, currentBackupStart.Sub(lastBackupStart)) } - log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init))) + log.Info("Backup Ranges", zap.Duration("take", currentBackupStart.Sub(init))) allFilesCollected <- struct{}{} }() @@ -503,8 +504,8 @@ func (bc *Client) BackupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. err = bc.fineGrainedBackup( - ctx, startKey, endKey, req.StartVersion, - req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh) + ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, + req.RateLimit, req.Concurrency, results, updateCh) if err != nil { return nil, err } @@ -564,6 +565,7 @@ func (bc *Client) fineGrainedBackup( startKey, endKey []byte, lastBackupTS uint64, backupTS uint64, + compressType kvproto.CompressionType, rateLimit uint64, concurrency uint32, rangeTree rtree.RangeTree, @@ -594,7 +596,7 @@ func (bc *Client) fineGrainedBackup( defer wg.Done() for rg := range retry { backoffMs, err := - bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, rateLimit, concurrency, respCh) + bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS, compressType, rateLimit, concurrency, respCh) if err != nil { errCh <- err return @@ -728,6 +730,7 @@ func (bc *Client) handleFineGrained( rg rtree.Range, lastBackupTS uint64, backupTS uint64, + compressType kvproto.CompressionType, rateLimit uint64, concurrency uint32, respCh chan<- *kvproto.BackupResponse, @@ -740,14 +743,15 @@ func (bc *Client) handleFineGrained( max := 0 req := kvproto.BackupRequest{ - ClusterId: bc.clusterID, - StartKey: rg.StartKey, // TODO: the range may cross region. - EndKey: rg.EndKey, - StartVersion: lastBackupTS, - EndVersion: backupTS, - StorageBackend: bc.backend, - RateLimit: rateLimit, - Concurrency: concurrency, + ClusterId: bc.clusterID, + StartKey: rg.StartKey, // TODO: the range may cross region. + EndKey: rg.EndKey, + StartVersion: lastBackupTS, + EndVersion: backupTS, + StorageBackend: bc.backend, + RateLimit: rateLimit, + Concurrency: concurrency, + CompressionType: compressType, } lockResolver := bc.mgr.GetLockResolver() client, err := bc.mgr.GetBackupClient(ctx, storeID) @@ -812,7 +816,7 @@ func SendBackup( // TODO: handle errors in the resp. log.Info("range backuped", zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())), - zap.Stringer("EndKey", utils.WrapKey(req.GetEndKey()))) + zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey()))) err = respFn(resp) if err != nil { return err diff --git a/pkg/gluetidb/glue.go b/pkg/gluetidb/glue.go index 7db995d83..487a5d628 100644 --- a/pkg/gluetidb/glue.go +++ b/pkg/gluetidb/glue.go @@ -3,20 +3,42 @@ package gluetidb import ( + "bytes" "context" + "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/gluetikv" ) +const ( + defaultCapOfCreateTable = 512 + defaultCapOfCreateDatabase = 64 + brComment = `/*from(br)*/` +) + +// New makes a new tidb glue. +func New() Glue { + log.Debug("enabling no register config") + // TODO use config.UpdateGlobalConfig here if TiDB merge it to 4.0 + conf := *config.GetGlobalConfig() + conf.SkipRegisterToDashboard = true + config.StoreGlobalConfig(&conf) + return Glue{} +} + // Glue is an implementation of glue.Glue using a new TiDB session. type Glue struct { tikvGlue gluetikv.Glue @@ -37,7 +59,10 @@ func (Glue) CreateSession(store kv.Storage) (glue.Session, error) { if err != nil { return nil, err } - return &tidbSession{se: se}, nil + tiSession := &tidbSession{ + se: se, + } + return tiSession, nil } // Open implements glue.Glue. @@ -68,6 +93,11 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error { // CreateDatabase implements glue.Session. func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { d := domain.GetDomain(gs.se).DDL() + query, err := gs.showCreateDatabase(schema) + if err != nil { + return err + } + gs.se.SetValue(sessionctx.QueryString, query) schema = schema.Clone() if len(schema.Charset) == 0 { schema.Charset = mysql.DefaultCharset @@ -78,7 +108,11 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) // CreateTable implements glue.Session. func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { d := domain.GetDomain(gs.se).DDL() - + query, err := gs.showCreateTable(table) + if err != nil { + return err + } + gs.se.SetValue(sessionctx.QueryString, query) // Clone() does not clone partitions yet :( table = table.Clone() if table.Partition != nil { @@ -86,7 +120,6 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) table.Partition = &newPartition } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) } @@ -94,3 +127,27 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl func (gs *tidbSession) Close() { gs.se.Close() } + +// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo. +func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) { + table := tbl.Clone() + table.AutoIncID = 0 + result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable)) + // this can never fail. + _, _ = result.WriteString(brComment) + if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil { + return "", err + } + return result.String(), nil +} + +// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo. +func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { + result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase)) + // this can never fail. + _, _ = result.WriteString(brComment) + if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil { + return "", err + } + return result.String(), nil +} diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 96425ffb4..d61a74b46 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -40,7 +40,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.Glue{}, s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxInt64) @@ -98,7 +98,7 @@ func (s *testRestoreClientSuite) TestIsOnline(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.Glue{}, s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) c.Assert(client.IsOnline(), IsFalse) diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 8e1807be7..fd3cde484 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -75,7 +75,7 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(autoIncID, Equals, uint64(globalAutoID)) // Alter AutoIncID to the next AutoIncID + 100 table.Info.AutoIncID = globalAutoID + 100 - db, err := restore.NewDB(gluetidb.Glue{}, s.mock.Storage) + db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) c.Assert(err, IsNil, Commentf("Error create DB")) tk.MustExec("drop database if exists test;") // Test empty collate value diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 0956df988..b79f33d35 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -28,9 +28,10 @@ import ( ) const ( - flagBackupTimeago = "timeago" - flagBackupTS = "backupts" - flagLastBackupTS = "lastbackupts" + flagBackupTimeago = "timeago" + flagBackupTS = "backupts" + flagLastBackupTS = "lastbackupts" + flagCompressionType = "compression" flagGCTTL = "gcttl" @@ -42,10 +43,11 @@ const ( type BackupConfig struct { Config - TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` - BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` - LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` - GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` } // DefineBackupFlags defines common flags for the backup command. @@ -60,6 +62,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ " e.g. '400036290571534337', '2018-05-11 01:42:23'") flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") + flags.String(flagCompressionType, "zstd", + "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -90,6 +94,16 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.GCTTL = gcTTL + compressionStr, err := flags.GetString(flagCompressionType) + if err != nil { + return errors.Trace(err) + } + compressionType, err := parseCompressionType(compressionStr) + if err != nil { + return errors.Trace(err) + } + cfg.CompressionType = compressionType + if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } @@ -149,10 +163,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 req := kvproto.BackupRequest{ - StartVersion: cfg.LastBackupTS, - EndVersion: backupTS, - RateLimit: cfg.RateLimit, - Concurrency: defaultBackupConcurrency, + StartVersion: cfg.LastBackupTS, + EndVersion: backupTS, + RateLimit: cfg.RateLimit, + Concurrency: defaultBackupConcurrency, + CompressionType: cfg.CompressionType, } ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( @@ -304,3 +319,18 @@ func parseTSString(ts string) (uint64, error) { } return variable.GoTimeToTS(t1), nil } + +func parseCompressionType(s string) (kvproto.CompressionType, error) { + var ct kvproto.CompressionType + switch s { + case "lz4": + ct = kvproto.CompressionType_LZ4 + case "snappy": + ct = kvproto.CompressionType_SNAPPY + case "zstd": + ct = kvproto.CompressionType_ZSTD + default: + return kvproto.CompressionType_UNKNOWN, errors.Errorf("invalid compression type '%s'", s) + } + return ct, nil +} diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index dfc8d5f67..4d237848b 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -31,9 +31,10 @@ const ( type RawKvConfig struct { Config - StartKey []byte `json:"start-key" toml:"start-key"` - EndKey []byte `json:"end-key" toml:"end-key"` - CF string `json:"cf" toml:"cf"` + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` + CompressionType kvproto.CompressionType `json:"compression-type" toml:"compression-type"` } // DefineRawBackupFlags defines common flags for the backup command. @@ -42,9 +43,11 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf") command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive") command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive") + command.Flags().String(flagCompressionType, "zstd", + "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") } -// ParseFromFlags parses the backup-related flags from the flag set. +// ParseFromFlags parses the raw kv backup&restore common flags from the flag set. func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { format, err := flags.GetString(flagKeyFormat) if err != nil { @@ -81,6 +84,25 @@ func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error { return nil } +// ParseBackupConfigFromFlags parses the backup-related flags from the flag set. +func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { + err := cfg.ParseFromFlags(flags) + if err != nil { + return err + } + + compressionStr, err := flags.GetString(flagCompressionType) + if err != nil { + return errors.Trace(err) + } + compressionType, err := parseCompressionType(compressionStr) + if err != nil { + return errors.Trace(err) + } + cfg.CompressionType = compressionType + return nil +} + // RunBackupRaw starts a backup task inside the current goroutine. func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error { defer summary.Summary(cmdName) @@ -121,12 +143,13 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ - StartVersion: 0, - EndVersion: 0, - RateLimit: cfg.RateLimit, - Concurrency: cfg.Concurrency, - IsRawKv: true, - Cf: cfg.CF, + StartVersion: 0, + EndVersion: 0, + RateLimit: cfg.RateLimit, + Concurrency: cfg.Concurrency, + IsRawKv: true, + Cf: cfg.CF, + CompressionType: cfg.CompressionType, } files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) if err != nil { diff --git a/tests/br_db/run.sh b/tests/br_db/run.sh index 31389fa28..4ea4a2d39 100755 --- a/tests/br_db/run.sh +++ b/tests/br_db/run.sh @@ -51,4 +51,9 @@ if [ "$table_count" -ne "2" ];then exit 1 fi +# Test BR DDL query string +echo "testing DDL query..." +curl 127.0.0.1:10080/ddl/history | grep -E '/\*from\(br\)\*/CREATE TABLE' +curl 127.0.0.1:10080/ddl/history | grep -E '/\*from\(br\)\*/CREATE DATABASE' + run_sql "DROP DATABASE $DB;"