From 1679ab5cf14d93e461faef45ff9f21968c77339e Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 10 Dec 2019 20:45:40 +0800 Subject: [PATCH 01/10] Update kvproto --- go.mod | 2 +- go.sum | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index fc062140a..1eafddd8c 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.4 - github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 + github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 github.com/pingcap/pd v1.1.0-beta.0.20191115131715-6b7dc037010e diff --git a/go.sum b/go.sum index 9bdae7d24..450264be3 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,7 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGtgGKrY0k/yy6N8L8Gdj8dliFswllU= github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc= From ee18eaa41116517f133a6a1c8b440a0de708a861 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Dec 2019 11:55:45 +0800 Subject: [PATCH 02/10] Implement raw restore --- cmd/restore.go | 148 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 + go.sum | 7 ++ pkg/restore/client.go | 99 ++++++++++++++++++++++++++-- pkg/utils/keys.go | 20 ++++++ 5 files changed, 272 insertions(+), 4 deletions(-) create mode 100644 pkg/utils/keys.go diff --git a/cmd/restore.go b/cmd/restore.go index 8c8dab8d4..868906617 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -39,6 +39,7 @@ func NewRestoreCommand() *cobra.Command { newFullRestoreCommand(), newDbRestoreCommand(), newTableRestoreCommand(), + newRawRestoreCommand(), ) command.PersistentFlags().Uint("concurrency", 128, @@ -78,6 +79,10 @@ func newFullRestoreCommand() *cobra.Command { return errors.Trace(err) } + if client.IsRawKvMode() { + return errors.New("cannot do full restore from raw kv data") + } + tableRules := make([]*import_sstpb.RewriteRule, 0) dataRules := make([]*import_sstpb.RewriteRule, 0) files := make([]*backup.File, 0) @@ -188,6 +193,10 @@ func newDbRestoreCommand() *cobra.Command { return errors.Trace(err) } + if client.IsRawKvMode() { + return errors.New("cannot do db restore from raw kv data") + } + dbName, err := cmd.Flags().GetString("db") if err != nil { return errors.Trace(err) @@ -294,6 +303,10 @@ func newTableRestoreCommand() *cobra.Command { return errors.Trace(err) } + if client.IsRawKvMode() { + return errors.New("cannot do table restore from raw kv data") + } + dbName, err := cmd.Flags().GetString("db") if err != nil { return errors.Trace(err) @@ -381,6 +394,141 @@ func newTableRestoreCommand() *cobra.Command { return command } +func newRawRestoreCommand() *cobra.Command { + command := &cobra.Command{ + Use: "raw", + Short: "restore a raw kv range", + RunE: func(cmd *cobra.Command, _ []string) error { + pdAddr, err := cmd.Flags().GetString(FlagPD) + if err != nil { + return errors.Trace(err) + } + ctx, cancel := context.WithCancel(GetDefaultContext()) + defer cancel() + + mgr, err := GetDefaultMgr() + if err != nil { + return err + } + defer mgr.Close() + + client, err := restore.NewRestoreClient( + ctx, mgr.GetPDClient(), mgr.GetTiKV()) + if err != nil { + return errors.Trace(err) + } + defer client.Close() + err = initRestoreClient(client, cmd.Flags()) + if err != nil { + return errors.Trace(err) + } + + if client.IsRawKvMode() { + return errors.New("cannot do raw restore from transactional data") + } + + startKey, err := cmd.Flags().GetBytesHex("start") + if err != nil { + return errors.Trace(err) + } + + endKey, err := cmd.Flags().GetBytesHex("end") + if err != nil { + return errors.Trace(err) + } + + ////* + //dbName, err := cmd.Flags().GetString("db") + //if err != nil { + // return errors.Trace(err) + //} + //db := client.GetDatabase(dbName) + //if db == nil { + // return errors.New("not exists database") + //} + //err = client.CreateDatabase(db.Schema) + //if err != nil { + // return errors.Trace(err) + //} + // + //tableName, err := cmd.Flags().GetString("table") + //if err != nil { + // return errors.Trace(err) + //} + //table := db.GetTable(tableName) + //if table == nil { + // return errors.New("not exists table") + //} + //// The rules here is raw key. + //rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), []*utils.Table{table}) + //if err != nil { + // return errors.Trace(err) + //} + //ranges := restore.GetRanges(table.Files) + ////*/ + + files, err := client.GetFilesInRawRange(startKey, endKey) + if err != nil { + return errors.Trace(err) + } + + ranges := restore.GetRanges(files) + + // Empty rewrite rules + rewriteRules := &restore_util.RewriteRules{} + + // Redirect to log if there is no log file to avoid unreadable output. + // TODO: How to show progress? + updateCh := utils.StartProgress( + ctx, + "Table Restore", + // Split/Scatter + Download/Ingest + int64(len(ranges)+len(files)), + !HasLogFile()) + + err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + if err != nil { + return errors.Trace(err) + } + pdAddrs := strings.Split(pdAddr, ",") + err = client.ResetTS(pdAddrs) + if err != nil { + return errors.Trace(err) + } + err = client.SwitchToImportMode(ctx) + if err != nil { + return errors.Trace(err) + } + err = client.RestoreRaw(startKey, endKey, files, updateCh) + if err != nil { + return errors.Trace(err) + } + err = client.SwitchToNormalMode(ctx) + if err != nil { + return errors.Trace(err) + } + // Restore has finished. + close(updateCh) + + // Checksum + //updateCh = utils.StartProgress( + // ctx, "Checksum", int64(len(newTables)), !HasLogFile()) + //err = client.ValidateChecksum( + // ctx, mgr.GetTiKV().GetClient(), []*utils.Table{table}, newTables, updateCh) + if err != nil { + return err + } + close(updateCh) + + return nil + }, + } + + //command.Flags().StringP("start", "s", "", "restore raw kv start key") + //command.Flags().StringP("end", "e", "", "restore raw kv end key") + return command +} + func initRestoreClient(client *restore.Client, flagSet *flag.FlagSet) error { u, err := flagSet.GetString(FlagStorage) if err != nil { diff --git a/go.mod b/go.mod index 1eafddd8c..ad1c48668 100644 --- a/go.mod +++ b/go.mod @@ -39,3 +39,5 @@ require ( ) replace github.com/golang/lint => golang.org/x/lint v0.0.0-20190930215403-16217165b5de + +replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec diff --git a/go.sum b/go.sum index 450264be3..79e8c61c5 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,12 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/MyonKeminta/kvproto v0.0.0-20191211100751-7b213f7034d3 h1:UDNhngHgEk6bZWW2Tbz3S4R1UA/8vIq5jJXYfzHWswA= +github.com/MyonKeminta/kvproto v0.0.0-20191211100751-7b213f7034d3/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/MyonKeminta/kvproto v0.0.0-20191211111614-fc82e473d8d2 h1:i+n3Uq1maMobTENW/h9l1oSIjzVMN6vB3wwMHLM4FQk= +github.com/MyonKeminta/kvproto v0.0.0-20191211111614-fc82e473d8d2/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec h1:yZwFX4POusoAWJEGUwK2YbqSsYQ2+D8z9bXtWa9cB8g= +github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= @@ -210,6 +216,7 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGtgGKrY0k/yy6N8L8Gdj8dliFswllU= github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868 h1:4FIcMXOHW2CwuTHJFuU/nLQ9KH2YYIoizuWfh8cQKk8= github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 9e0b8f61b..79aef0d1f 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -1,7 +1,9 @@ package restore import ( + "bytes" "context" + "encoding/hex" "math" "sort" "sync" @@ -86,11 +88,13 @@ func (rc *Client) Close() { // InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, storagePath string) error { - databases, err := utils.LoadBackupTables(backupMeta) - if err != nil { - return errors.Trace(err) + if !backupMeta.IsRawKv { + databases, err := utils.LoadBackupTables(backupMeta) + if err != nil { + return errors.Trace(err) + } + rc.databases = databases } - rc.databases = databases rc.backupMeta = backupMeta metaClient := restore_util.NewClient(rc.pdClient) @@ -99,6 +103,41 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, storagePath stri return nil } +// IsRawKvMode checks whether the backup data is in raw kv format, in which case transactional recover is forbidden. +func (rc *Client) IsRawKvMode() bool { + return rc.backupMeta.IsRawKv +} + +// GetFilesInRawRange gets all files that are in the given range or intersects with the given range. +func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte) ([]*backup.File, error) { + if !rc.IsRawKvMode() { + return nil, errors.New("the backup data is not in raw kv mode") + } + + if bytes.Compare(startKey, rc.backupMeta.RawStartKey) < 0 || + utils.CompareEndKey(endKey, rc.backupMeta.RawEndKey) > 0 { + return nil, errors.New("restoring range exceeds backup data's range") + } + + files := make([]*backup.File, 0) + + for _, file := range rc.backupMeta.Files { + if len(file.EndKey) > 0 && bytes.Compare(file.EndKey, startKey) < 0 { + // The file is before the range to be restored. + continue + } + if len(endKey) > 0 && bytes.Compare(endKey, file.StartKey) >= 0 { + // The file is after the range to be restored. + // The specified endKey is exclusive, so when it equals to a file's startKey, the file is still skipped. + continue + } + + files = append(files, file) + } + + return files, nil +} + // SetConcurrency sets the concurrency of dbs tables files func (rc *Client) SetConcurrency(c uint) { rc.workerPool = utils.NewWorkerPool(c, "file") @@ -341,6 +380,58 @@ func (rc *Client) RestoreAll( return nil } +// RestoreRaw tries to restore raw keys in the specified range. +func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error { + start := time.Now() + defer func() { + elapsed := time.Since(start) + log.Info("Restore Raw", + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey)), + zap.Duration("take", elapsed)) + }() + errCh := make(chan error, len(rc.databases)) + wg := new(sync.WaitGroup) + defer close(errCh) + + // TODO: Fix file borders + + emptyRules := &restore_util.RewriteRules{} + for _, file := range files { + wg.Add(1) + fileReplica := file + rc.workerPool.Apply( + func() { + defer wg.Done() + select { + case <-rc.ctx.Done(): + errCh <- nil + case errCh <- rc.fileImporter.Import(fileReplica, emptyRules): + updateCh <- struct{}{} + } + }) + } + for range files { + err := <-errCh + if err != nil { + rc.cancel() + wg.Wait() + log.Error( + "restore raw range failed", + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey)), + zap.Error(err), + ) + return err + } + } + log.Info( + "finish to restore raw range", + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey)), + ) +} + //SwitchToImportMode switch tikv cluster to import mode func (rc *Client) SwitchToImportMode(ctx context.Context) error { return rc.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import) diff --git a/pkg/utils/keys.go b/pkg/utils/keys.go new file mode 100644 index 000000000..c5f69a8ac --- /dev/null +++ b/pkg/utils/keys.go @@ -0,0 +1,20 @@ +package utils + +import "bytes" + +// CompareEndKey compared two keys that BOTH represent the exclusive end of some range. An empty end key is the very +// end, so an empty key is greater than any other keys. +func CompareEndKey(a, b []byte) int { + if len(a) == 0 { + if len(b) == 0 { + return 0 + } + return 1 + } + + if len(b) == 0 { + return -1 + } + + return bytes.Compare(a, b) +} From 838fe4c95b4fb712b267dccc157d1456ffafee98 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Dec 2019 13:05:30 +0800 Subject: [PATCH 03/10] fix build --- cmd/restore.go | 12 ++++++------ go.sum | 9 --------- pkg/restore/client.go | 1 + 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 468dc574d..5e6e2a94a 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -498,7 +498,7 @@ func newRawRestoreCommand() *cobra.Command { if err != nil { return errors.Trace(err) } - err = client.SwitchToImportMode(ctx) + err = client.SwitchToImportModeIfOffline(ctx) if err != nil { return errors.Trace(err) } @@ -506,7 +506,7 @@ func newRawRestoreCommand() *cobra.Command { if err != nil { return errors.Trace(err) } - err = client.SwitchToNormalMode(ctx) + err = client.SwitchToNormalModeIfOffline(ctx) if err != nil { return errors.Trace(err) } @@ -518,10 +518,10 @@ func newRawRestoreCommand() *cobra.Command { // ctx, "Checksum", int64(len(newTables)), !HasLogFile()) //err = client.ValidateChecksum( // ctx, mgr.GetTiKV().GetClient(), []*utils.Table{table}, newTables, updateCh) - if err != nil { - return err - } - close(updateCh) + //if err != nil { + // return err + //} + //close(updateCh) return nil }, diff --git a/go.sum b/go.sum index fcefebef9..e3d75e8fc 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/MyonKeminta/kvproto v0.0.0-20191211100751-7b213f7034d3 h1:UDNhngHgEk6bZWW2Tbz3S4R1UA/8vIq5jJXYfzHWswA= -github.com/MyonKeminta/kvproto v0.0.0-20191211100751-7b213f7034d3/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/MyonKeminta/kvproto v0.0.0-20191211111614-fc82e473d8d2 h1:i+n3Uq1maMobTENW/h9l1oSIjzVMN6vB3wwMHLM4FQk= -github.com/MyonKeminta/kvproto v0.0.0-20191211111614-fc82e473d8d2/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec h1:yZwFX4POusoAWJEGUwK2YbqSsYQ2+D8z9bXtWa9cB8g= github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY= @@ -213,11 +209,6 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGtgGKrY0k/yy6N8L8Gdj8dliFswllU= -github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54 h1:T8myp+i7bPLy/W4rEjtsAZgjGTqQ0rnLu9xQ4YAfXJU= -github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc= diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 5b3928012..1d79d8837 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -436,6 +436,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), ) + return nil } //SwitchToImportModeIfOffline switch tikv cluster to import mode From 501eb9bd97a4ad5b105612c3437db046e94b44a6 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Dec 2019 14:40:33 +0800 Subject: [PATCH 04/10] Set range for file importer Signed-off-by: MyonKeminta --- pkg/restore/client.go | 8 ++++++-- pkg/restore/import.go | 30 ++++++++++++++++++++++++++++++ pkg/utils/keys.go | 3 ++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 1d79d8837..e3c33d176 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -100,7 +100,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. metaClient := restore_util.NewClient(rc.pdClient) importClient := NewImportClient(metaClient) - rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend) + rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, backupMeta.IsRawKv) return nil } @@ -400,7 +400,11 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil wg := new(sync.WaitGroup) defer close(errCh) - // TODO: Fix file borders + err := rc.fileImporter.SetRawRange(startKey, endKey) + if err != nil { + + return errors.Trace(err) + } emptyRules := &restore_util.RewriteRules{} for _, file := range files { diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 9bc236dcf..df320c4f1 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -1,6 +1,7 @@ package restore import ( + "bytes" "context" "sync" "time" @@ -116,6 +117,10 @@ type FileImporter struct { importClient ImporterClient backend *backup.StorageBackend + isRawKvMode bool + rawStartKey []byte + rawEndKey []byte + ctx context.Context cancel context.CancelFunc } @@ -126,6 +131,7 @@ func NewFileImporter( metaClient restore_util.Client, importClient ImporterClient, backend *backup.StorageBackend, + isRawKvMode bool, ) FileImporter { ctx, cancel := context.WithCancel(ctx) return FileImporter{ @@ -134,9 +140,20 @@ func NewFileImporter( ctx: ctx, cancel: cancel, importClient: importClient, + isRawKvMode: isRawKvMode, } } +// SetRawRange sets the range to be restored in raw kv mode. +func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { + if !importer.isRawKvMode { + return errors.New("file importer is not in raw kv mode") + } + importer.rawStartKey = startKey + importer.rawEndKey = endKey + return nil +} + // Import tries to import a file. // All rules must contain encoded keys. func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error { @@ -233,6 +250,19 @@ func (importer *FileImporter) downloadSST( return nil, true, errRewriteRuleNotFound } sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, regionRule) + // For raw kv mode, cut the SST file's range to fit in the restoring range. + if importer.isRawKvMode { + if bytes.Compare(importer.rawStartKey, sstMeta.Range.GetStart()) > 0 { + sstMeta.Range.Start = importer.rawStartKey + } + // TODO: importer.RawEndKey is exclusive but sstMeta.Range.End is inclusive. How to exclude importer.RawEndKey? + if len(importer.rawEndKey) > 0 && bytes.Compare(importer.rawEndKey, sstMeta.Range.GetEnd()) < 0 { + sstMeta.Range.End = importer.rawEndKey + } + if bytes.Compare(sstMeta.Range.GetStart(), sstMeta.Range.GetEnd()) > 0 { + return &sstMeta, true, nil + } + } sstMeta.RegionId = regionInfo.Region.GetId() sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch() req := &import_sstpb.DownloadRequest{ diff --git a/pkg/utils/keys.go b/pkg/utils/keys.go index c5f69a8ac..f03a21d25 100644 --- a/pkg/utils/keys.go +++ b/pkg/utils/keys.go @@ -2,8 +2,9 @@ package utils import "bytes" -// CompareEndKey compared two keys that BOTH represent the exclusive end of some range. An empty end key is the very +// CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very // end, so an empty key is greater than any other keys. +// Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range. func CompareEndKey(a, b []byte) int { if len(a) == 0 { if len(b) == 0 { From f1c803d50a21e1e9833317f462eebba1aee930d1 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Dec 2019 14:45:42 +0800 Subject: [PATCH 05/10] Remove unnecessary comments Signed-off-by: MyonKeminta --- cmd/restore.go | 44 ++------------------------------------------ 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 5e6e2a94a..97e5d34da 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -440,36 +440,6 @@ func newRawRestoreCommand() *cobra.Command { return errors.Trace(err) } - ////* - //dbName, err := cmd.Flags().GetString("db") - //if err != nil { - // return errors.Trace(err) - //} - //db := client.GetDatabase(dbName) - //if db == nil { - // return errors.New("not exists database") - //} - //err = client.CreateDatabase(db.Schema) - //if err != nil { - // return errors.Trace(err) - //} - // - //tableName, err := cmd.Flags().GetString("table") - //if err != nil { - // return errors.Trace(err) - //} - //table := db.GetTable(tableName) - //if table == nil { - // return errors.New("not exists table") - //} - //// The rules here is raw key. - //rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), []*utils.Table{table}) - //if err != nil { - // return errors.Trace(err) - //} - //ranges := restore.GetRanges(table.Files) - ////*/ - files, err := client.GetFilesInRawRange(startKey, endKey) if err != nil { return errors.Trace(err) @@ -513,22 +483,12 @@ func newRawRestoreCommand() *cobra.Command { // Restore has finished. close(updateCh) - // Checksum - //updateCh = utils.StartProgress( - // ctx, "Checksum", int64(len(newTables)), !HasLogFile()) - //err = client.ValidateChecksum( - // ctx, mgr.GetTiKV().GetClient(), []*utils.Table{table}, newTables, updateCh) - //if err != nil { - // return err - //} - //close(updateCh) - return nil }, } - //command.Flags().StringP("start", "s", "", "restore raw kv start key") - //command.Flags().StringP("end", "e", "", "restore raw kv end key") + command.Flags().StringP("start", "s", "", "restore raw kv start key") + command.Flags().StringP("end", "e", "", "restore raw kv end key") return command } From cdd7449711a9f659d99251461d7d2a06225e9f20 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 13 Dec 2019 13:41:19 +0800 Subject: [PATCH 06/10] check cf and support multi ranges in BackupMeta Signed-off-by: MyonKeminta --- cmd/restore.go | 12 ++++++++--- go.mod | 4 +--- go.sum | 6 ++++-- pkg/restore/client.go | 50 +++++++++++++++++++++++++++++-------------- 4 files changed, 48 insertions(+), 24 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 97e5d34da..43301e891 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -440,7 +440,12 @@ func newRawRestoreCommand() *cobra.Command { return errors.Trace(err) } - files, err := client.GetFilesInRawRange(startKey, endKey) + cf, err := cmd.Flags().GetString("cf") + if err != nil { + return errors.Trace(err) + } + + files, err := client.GetFilesInRawRange(startKey, endKey, cf) if err != nil { return errors.Trace(err) } @@ -487,8 +492,9 @@ func newRawRestoreCommand() *cobra.Command { }, } - command.Flags().StringP("start", "s", "", "restore raw kv start key") - command.Flags().StringP("end", "e", "", "restore raw kv end key") + command.Flags().StringP("start", "", "", "restore raw kv start key") + command.Flags().StringP("end", "", "", "restore raw kv end key") + command.Flags().StringP("cf", "", "default", "the cf to restore raw keys") return command } diff --git a/go.mod b/go.mod index bedc81d49..27a6bb6b5 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.4 - github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54 + github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 github.com/pingcap/pd v1.1.0-beta.0.20191115131715-6b7dc037010e @@ -39,5 +39,3 @@ require ( ) replace github.com/golang/lint => golang.org/x/lint v0.0.0-20190930215403-16217165b5de - -replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec diff --git a/go.sum b/go.sum index e3d75e8fc..140452271 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec h1:yZwFX4POusoAWJEGUwK2YbqSsYQ2+D8z9bXtWa9cB8g= -github.com/MyonKeminta/kvproto v0.0.0-20191212035151-97954de3fbec/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= @@ -209,6 +207,10 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b h1:TcrATUpJ9EADLXKmnREh+haj6GXY8sAkRFuqoIfVRUQ= +github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc= diff --git a/pkg/restore/client.go b/pkg/restore/client.go index e3c33d176..41d25ec5d 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -110,33 +110,51 @@ func (rc *Client) IsRawKvMode() bool { } // GetFilesInRawRange gets all files that are in the given range or intersects with the given range. -func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte) ([]*backup.File, error) { +func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backup.File, error) { if !rc.IsRawKvMode() { return nil, errors.New("the backup data is not in raw kv mode") } - if bytes.Compare(startKey, rc.backupMeta.RawStartKey) < 0 || - utils.CompareEndKey(endKey, rc.backupMeta.RawEndKey) > 0 { - return nil, errors.New("restoring range exceeds backup data's range") - } - - files := make([]*backup.File, 0) - - for _, file := range rc.backupMeta.Files { - if len(file.EndKey) > 0 && bytes.Compare(file.EndKey, startKey) < 0 { - // The file is before the range to be restored. + for _, rawRange := range rc.backupMeta.RawRanges { + // First check whether the given range is backup-ed. If not, we cannot perform the restore. + if rawRange.Cf != cf { continue } - if len(endKey) > 0 && bytes.Compare(endKey, file.StartKey) >= 0 { - // The file is after the range to be restored. - // The specified endKey is exclusive, so when it equals to a file's startKey, the file is still skipped. + + if (len(rawRange.EndKey) > 0 && bytes.Compare(startKey, rawRange.EndKey) >= 0) || + (len(endKey) > 0 && bytes.Compare(rawRange.StartKey, endKey) >= 0) { + // The restoring range is totally out of the current range. Skip it. continue } - files = append(files, file) + if bytes.Compare(startKey, rawRange.StartKey) < 0 || + utils.CompareEndKey(endKey, rawRange.EndKey) > 0 { + // Only partial of the restoring range is in the current backup-ed range. So the given range can't be fully + // restored. + return nil, errors.New("no backup data in the range") + } + + // We have found the range that contains the given range. Find all necessary files. + files := make([]*backup.File, 0) + + for _, file := range rc.backupMeta.Files { + if len(file.EndKey) > 0 && bytes.Compare(file.EndKey, startKey) < 0 { + // The file is before the range to be restored. + continue + } + if len(endKey) > 0 && bytes.Compare(endKey, file.StartKey) >= 0 { + // The file is after the range to be restored. + // The specified endKey is exclusive, so when it equals to a file's startKey, the file is still skipped. + continue + } + + files = append(files, file) + } + + return files, nil } - return files, nil + return nil, errors.New("no backup data in the range") } // SetConcurrency sets the concurrency of dbs tables files From da2c5cd2782afaa2600976f97b1d78b3842368d6 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 13 Dec 2019 17:19:03 +0800 Subject: [PATCH 07/10] Check files' cf; address comments --- cmd/restore.go | 11 +---------- go.mod | 2 ++ go.sum | 6 ++---- pkg/restore/client.go | 4 ++++ 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 43301e891..28afd9037 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -402,10 +402,6 @@ func newRawRestoreCommand() *cobra.Command { Use: "raw", Short: "restore a raw kv range", RunE: func(cmd *cobra.Command, _ []string) error { - pdAddr, err := cmd.Flags().GetString(FlagPD) - if err != nil { - return errors.Trace(err) - } ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() @@ -426,7 +422,7 @@ func newRawRestoreCommand() *cobra.Command { return errors.Trace(err) } - if client.IsRawKvMode() { + if !client.IsRawKvMode() { return errors.New("cannot do raw restore from transactional data") } @@ -468,11 +464,6 @@ func newRawRestoreCommand() *cobra.Command { if err != nil { return errors.Trace(err) } - pdAddrs := strings.Split(pdAddr, ",") - err = client.ResetTS(pdAddrs) - if err != nil { - return errors.Trace(err) - } err = client.SwitchToImportModeIfOffline(ctx) if err != nil { return errors.Trace(err) diff --git a/go.mod b/go.mod index 27a6bb6b5..6c3cdbd18 100644 --- a/go.mod +++ b/go.mod @@ -39,3 +39,5 @@ require ( ) replace github.com/golang/lint => golang.org/x/lint v0.0.0-20190930215403-16217165b5de + +replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20191213083213-4b45d2635872 diff --git a/go.sum b/go.sum index 140452271..c5a288f0d 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/MyonKeminta/kvproto v0.0.0-20191213083213-4b45d2635872 h1:4AYlfooonOhknrteoDfqUgTPuAdJ5o2kR8d/nRpiBWo= +github.com/MyonKeminta/kvproto v0.0.0-20191213083213-4b45d2635872/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= @@ -207,10 +209,6 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b h1:TcrATUpJ9EADLXKmnREh+haj6GXY8sAkRFuqoIfVRUQ= -github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc= diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 41d25ec5d..93741c91b 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -138,6 +138,10 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) files := make([]*backup.File, 0) for _, file := range rc.backupMeta.Files { + if file.Cf != cf { + continue + } + if len(file.EndKey) > 0 && bytes.Compare(file.EndKey, startKey) < 0 { // The file is before the range to be restored. continue From 925f0f8d279a776435a9ed930e9e814b19aaab1d Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 11 Dec 2019 11:34:32 +0800 Subject: [PATCH 08/10] backup: add raw backup command --- cmd/backup.go | 130 +++++++++++++++++++++++++++++++----------- go.mod | 2 +- go.sum | 4 +- pkg/backup/client.go | 24 ++++++-- pkg/utils/key.go | 70 +++++++++++++++++++++++ pkg/utils/key_test.go | 41 +++++++++++++ 6 files changed, 232 insertions(+), 39 deletions(-) create mode 100644 pkg/utils/key.go create mode 100644 pkg/utils/key_test.go diff --git a/cmd/backup.go b/cmd/backup.go index fb0c37b22..a9a7bdb00 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -1,6 +1,7 @@ package cmd import ( + "bytes" "context" "github.com/pingcap/errors" @@ -23,6 +24,16 @@ const ( flagBackupChecksum = "checksum" ) +type backupContext struct { + db string + table string + + isRawKv bool + startKey []byte + endKey []byte + cf string +} + func defineBackupFlags(flagSet *pflag.FlagSet) { flagSet.StringP( flagBackupTimeago, "", "", @@ -35,7 +46,7 @@ func defineBackupFlags(flagSet *pflag.FlagSet) { "Run checksum after backup") } -func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { +func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error { ctx, cancel := context.WithCancel(defaultContext) defer cancel() @@ -91,10 +102,18 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { defer summary.Summary(cmdName) - ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( - mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table) - if err != nil { - return err + var ( + ranges []backup.Range + backupSchemas *backup.Schemas + ) + if bc.isRawKv { + ranges = []backup.Range{{StartKey: bc.startKey, EndKey: bc.endKey}} + } else { + ranges, backupSchemas, err = backup.BuildBackupRangeAndSchema( + mgr.GetDomain(), mgr.GetTiKV(), backupTS, bc.db, bc.table) + if err != nil { + return err + } } // The number of regions need to backup @@ -114,38 +133,39 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { updateCh := utils.StartProgress( ctx, cmdName, int64(approximateRegions), !HasLogFile()) err = client.BackupRanges( - ctx, ranges, backupTS, ratelimit, concurrency, updateCh) + ctx, ranges, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf) if err != nil { return err } // Backup has finished close(updateCh) - // Checksum - backupSchemasConcurrency := backup.DefaultSchemaConcurrency - if backupSchemas.Len() < backupSchemasConcurrency { - backupSchemasConcurrency = backupSchemas.Len() - } - updateCh = utils.StartProgress( - ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile()) - backupSchemas.SetSkipChecksum(!checksum) - backupSchemas.Start( - ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - - err = client.CompleteMeta(backupSchemas) - if err != nil { - return err - } + if backupSchemas != nil { + // Checksum + backupSchemasConcurrency := backup.DefaultSchemaConcurrency + if backupSchemas.Len() < backupSchemasConcurrency { + backupSchemasConcurrency = backupSchemas.Len() + } + updateCh = utils.StartProgress( + ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile()) + backupSchemas.SetSkipChecksum(!checksum) + backupSchemas.Start( + ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - valid, err := client.FastChecksum() - if err != nil { - return err - } - if !valid { - log.Error("backup FastChecksum failed!") + err = client.CompleteMeta(backupSchemas) + if err != nil { + return err + } + valid, err := client.FastChecksum() + if err != nil { + return err + } + if !valid { + log.Error("backup FastChecksum failed!") + } + // Checksum has finished + close(updateCh) } - // Checksum has finished - close(updateCh) err = client.SaveBackupMeta(ctx) if err != nil { @@ -179,6 +199,7 @@ func NewBackupCommand() *cobra.Command { newFullBackupCommand(), newDbBackupCommand(), newTableBackupCommand(), + newRawBackupCommand(), ) defineBackupFlags(command.PersistentFlags()) @@ -192,7 +213,8 @@ func newFullBackupCommand() *cobra.Command { Short: "backup all database", RunE: func(command *cobra.Command, _ []string) error { // empty db/table means full backup. - return runBackup(command.Flags(), "Full backup", "", "") + bc := backupContext{db: "", table: "", isRawKv: false} + return runBackup(command.Flags(), "Full backup", bc) }, } return command @@ -211,7 +233,8 @@ func newDbBackupCommand() *cobra.Command { if len(db) == 0 { return errors.Errorf("empty database name is not allowed") } - return runBackup(command.Flags(), "Database backup", db, "") + bc := backupContext{db: db, table: "", isRawKv: false} + return runBackup(command.Flags(), "Database backup", bc) }, } command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db") @@ -240,7 +263,8 @@ func newTableBackupCommand() *cobra.Command { if len(table) == 0 { return errors.Errorf("empty table name is not allowed") } - return runBackup(command.Flags(), "Table backup", db, table) + bc := backupContext{db: db, table: table, isRawKv: false} + return runBackup(command.Flags(), "Table backup", bc) }, } command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db") @@ -249,3 +273,45 @@ func newTableBackupCommand() *cobra.Command { _ = command.MarkFlagRequired(flagTable) return command } + +// newRawBackupCommand return a raw kv range backup subcommand. +func newRawBackupCommand() *cobra.Command { + command := &cobra.Command{ + Use: "raw", + Short: "backup a raw kv range from TiKV cluster", + RunE: func(command *cobra.Command, _ []string) error { + start, err := command.Flags().GetString("start") + if err != nil { + return err + } + startKey, err := utils.ParseKey(command.Flags(), start) + if err != nil { + return err + } + end, err := command.Flags().GetString("end") + if err != nil { + return err + } + endKey, err := utils.ParseKey(command.Flags(), end) + if err != nil { + return err + } + + cf, err := command.Flags().GetString("cf") + if err != nil { + return err + } + + if bytes.Compare(startKey, endKey) > 0 { + return errors.New("input endKey must greater or equal than startKey") + } + bc := backupContext{startKey: startKey, endKey: endKey, isRawKv: true, cf: cf} + return runBackup(command.Flags(), "Raw Backup", bc) + }, + } + command.Flags().StringP("format", "", "raw", "raw key format") + command.Flags().StringP("cf", "", "default", "backup raw kv cf") + command.Flags().StringP("start", "", "", "backup raw kv start key") + command.Flags().StringP("end", "", "", "backup raw kv end key") + return command +} diff --git a/go.mod b/go.mod index 8f96a6a2e..916c180d1 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 github.com/pingcap/errors v0.11.4 - github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c + github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01 github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5 diff --git a/go.sum b/go.sum index 02c1d157c..7fecef7b2 100644 --- a/go.sum +++ b/go.sum @@ -267,8 +267,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c h1:CwVCq7XA/NvTQ6X9ZAhZlvcEvseUsHiPFQf2mL3LVl4= -github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 h1:thLL2vFObG8vxBCkAmfAbLVBPfXUkBSXaVxppStCrL0= +github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 4d3550a7a..feccecc2a 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -283,6 +283,8 @@ func (bc *Client) BackupRanges( rate uint64, concurrency uint32, updateCh chan<- struct{}, + isRawKv bool, + cf string, ) error { start := time.Now() defer func() { @@ -296,7 +298,7 @@ func (bc *Client) BackupRanges( go func() { for _, r := range ranges { err := bc.backupRange( - ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh) + ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh, isRawKv, cf) if err != nil { errCh <- err return @@ -343,6 +345,8 @@ func (bc *Client) backupRange( rateMBs uint64, concurrency uint32, updateCh chan<- struct{}, + isRawKv bool, + cf string, ) (err error) { start := time.Now() defer func() { @@ -379,6 +383,8 @@ func (bc *Client) backupRange( StorageBackend: bc.backend, RateLimit: rateLimit, Concurrency: concurrency, + IsRawKv: isRawKv, + Cf: cf, } push := newPushDown(ctx, bc.mgr, len(allStores)) @@ -400,9 +406,19 @@ func (bc *Client) backupRange( bc.backupMeta.StartVersion = backupTS bc.backupMeta.EndVersion = backupTS - log.Info("backup time range", - zap.Reflect("StartVersion", backupTS), - zap.Reflect("EndVersion", backupTS)) + bc.backupMeta.IsRawKv = isRawKv + if bc.backupMeta.IsRawKv { + bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges, + &backup.RawRange{StartKey: startKey, EndKey: endKey, Cf: cf}) + log.Info("backup raw ranges", + zap.ByteString("startKey", startKey), + zap.ByteString("endKey", endKey), + zap.String("cf", cf)) + } else { + log.Info("backup time range", + zap.Reflect("StartVersion", backupTS), + zap.Reflect("EndVersion", backupTS)) + } results.tree.Ascend(func(i btree.Item) bool { r := i.(*Range) diff --git a/pkg/utils/key.go b/pkg/utils/key.go new file mode 100644 index 000000000..59a35f743 --- /dev/null +++ b/pkg/utils/key.go @@ -0,0 +1,70 @@ +package utils + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "strings" + + "github.com/pingcap/errors" + "github.com/spf13/pflag" +) + +// ParseKey parse key by given format +func ParseKey(flags *pflag.FlagSet, key string) ([]byte, error) { + switch flags.Lookup("format").Value.String() { + case "raw": + return []byte(key), nil + case "escaped": + return unescapedKey(key) + case "hex": + key, err := hex.DecodeString(key) + if err != nil { + return nil, errors.WithStack(err) + } + return key, nil + } + return nil, errors.New("unknown format") +} + +func unescapedKey(text string) ([]byte, error) { + var buf []byte + r := bytes.NewBuffer([]byte(text)) + for { + c, err := r.ReadByte() + if err != nil { + if err != io.EOF { + return nil, errors.WithStack(err) + } + break + } + if c != '\\' { + buf = append(buf, c) + continue + } + n := r.Next(1) + if len(n) == 0 { + return nil, io.EOF + } + // See: https://golang.org/ref/spec#Rune_literals + if idx := strings.IndexByte(`abfnrtv\'"`, n[0]); idx != -1 { + buf = append(buf, []byte("\a\b\f\n\r\t\v\\'\"")[idx]) + continue + } + + switch n[0] { + case 'x': + fmt.Sscanf(string(r.Next(2)), "%02x", &c) + buf = append(buf, c) + default: + n = append(n, r.Next(2)...) + _, err := fmt.Sscanf(string(n), "%03o", &c) + if err != nil { + return nil, errors.WithStack(err) + } + buf = append(buf, c) + } + } + return buf, nil +} diff --git a/pkg/utils/key_test.go b/pkg/utils/key_test.go new file mode 100644 index 000000000..c983d34e5 --- /dev/null +++ b/pkg/utils/key_test.go @@ -0,0 +1,41 @@ +package utils + +import ( + "encoding/hex" + + . "github.com/pingcap/check" + "github.com/spf13/pflag" +) + +type testKeySuite struct{} + +var _ = Suite(&testKeySuite{}) + +func (r *testKeySuite) TestParseKey(c *C) { + flagSet := &pflag.FlagSet{} + flagSet.String("format", "raw", "") + rawKey := "1234" + parsedKey, err := ParseKey(flagSet, rawKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte(rawKey)) + + flagSet = &pflag.FlagSet{} + flagSet.String("format", "escaped", "") + escapedKey := "\\a\\x1" + parsedKey, err = ParseKey(flagSet, escapedKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte("\a\x01")) + + flagSet = &pflag.FlagSet{} + flagSet.String("format", "hex", "") + hexKey := hex.EncodeToString([]byte("1234")) + parsedKey, err = ParseKey(flagSet, hexKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte("1234")) + + flagSet = &pflag.FlagSet{} + flagSet.String("format", "notSupport", "") + _, err = ParseKey(flagSet, rawKey) + c.Assert(err, ErrorMatches, "*unknown format*") + +} From c7c72435a192a5c203c494e4476d1a9eb5f5c1df Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 25 Dec 2019 15:33:25 +0800 Subject: [PATCH 09/10] adjust structure to keep consistent with master --- cmd/restore.go | 143 +++++++++++++++++++++++++------------------------ 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 3bcd57e94..2cf945b4a 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -202,6 +202,78 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error return nil } +func runRawRestore(flagSet *flag.FlagSet, startKey, endKey []byte, cf string) error { + ctx, cancel := context.WithCancel(GetDefaultContext()) + defer cancel() + + mgr, err := GetDefaultMgr() + if err != nil { + return err + } + defer mgr.Close() + + client, err := restore.NewRestoreClient( + ctx, mgr.GetPDClient(), mgr.GetTiKV()) + if err != nil { + return errors.Trace(err) + } + defer client.Close() + err = initRestoreClient(ctx, client, flagSet) + if err != nil { + return errors.Trace(err) + } + + if !client.IsRawKvMode() { + return errors.New("cannot do raw restore from transactional data") + } + + files, err := client.GetFilesInRawRange(startKey, endKey, cf) + if err != nil { + return errors.Trace(err) + } + + // Empty rewrite rules + rewriteRules := &restore_util.RewriteRules{} + + ranges, err := restore.ValidateFileRanges(files, rewriteRules) + if err != nil { + return errors.Trace(err) + } + + // Redirect to log if there is no log file to avoid unreadable output. + // TODO: How to show progress? + updateCh := utils.StartProgress( + ctx, + "Table Restore", + // Split/Scatter + Download/Ingest + int64(len(ranges)+len(files)), + !HasLogFile()) + + err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + if err != nil { + return errors.Trace(err) + } + + removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) + if err != nil { + return errors.Trace(err) + } + + err = client.RestoreRaw(startKey, endKey, files, updateCh) + if err != nil { + return errors.Trace(err) + } + + err = RestorePostWork(ctx, client, mgr, removedSchedulers) + if err != nil { + return errors.Trace(err) + } + // Restore has finished. + close(updateCh) + + return nil +} + func newFullRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "full", @@ -269,30 +341,6 @@ func newRawRestoreCommand() *cobra.Command { Use: "raw", Short: "restore a raw kv range", RunE: func(cmd *cobra.Command, _ []string) error { - ctx, cancel := context.WithCancel(GetDefaultContext()) - defer cancel() - - mgr, err := GetDefaultMgr() - if err != nil { - return err - } - defer mgr.Close() - - client, err := restore.NewRestoreClient( - ctx, mgr.GetPDClient(), mgr.GetTiKV()) - if err != nil { - return errors.Trace(err) - } - defer client.Close() - err = initRestoreClient(ctx, client, cmd.Flags()) - if err != nil { - return errors.Trace(err) - } - - if !client.IsRawKvMode() { - return errors.New("cannot do raw restore from transactional data") - } - startKey, err := cmd.Flags().GetBytesHex("start") if err != nil { return errors.Trace(err) @@ -307,52 +355,7 @@ func newRawRestoreCommand() *cobra.Command { if err != nil { return errors.Trace(err) } - - files, err := client.GetFilesInRawRange(startKey, endKey, cf) - if err != nil { - return errors.Trace(err) - } - - // Empty rewrite rules - rewriteRules := &restore_util.RewriteRules{} - - ranges, err := restore.ValidateFileRanges(files, rewriteRules) - if err != nil { - return errors.Trace(err) - } - - // Redirect to log if there is no log file to avoid unreadable output. - // TODO: How to show progress? - updateCh := utils.StartProgress( - ctx, - "Table Restore", - // Split/Scatter + Download/Ingest - int64(len(ranges)+len(files)), - !HasLogFile()) - - err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) - if err != nil { - return errors.Trace(err) - } - - removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) - if err != nil { - return errors.Trace(err) - } - - err = client.RestoreRaw(startKey, endKey, files, updateCh) - if err != nil { - return errors.Trace(err) - } - - err = RestorePostWork(ctx, client, mgr, removedSchedulers) - if err != nil { - return errors.Trace(err) - } - // Restore has finished. - close(updateCh) - - return nil + return runRawRestore(cmd.Flags(), startKey, endKey, cf) }, } From 60bd5e54a061cce681a7cd66a3da6d794f33df80 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 25 Dec 2019 15:42:37 +0800 Subject: [PATCH 10/10] Use ParseKey instead of GetHex --- cmd/restore.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 2cf945b4a..a6a4827e9 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -341,14 +341,21 @@ func newRawRestoreCommand() *cobra.Command { Use: "raw", Short: "restore a raw kv range", RunE: func(cmd *cobra.Command, _ []string) error { - startKey, err := cmd.Flags().GetBytesHex("start") + start, err := cmd.Flags().GetString("start") if err != nil { - return errors.Trace(err) + return err } - - endKey, err := cmd.Flags().GetBytesHex("end") + startKey, err := utils.ParseKey(cmd.Flags(), start) if err != nil { - return errors.Trace(err) + return err + } + end, err := cmd.Flags().GetString("end") + if err != nil { + return err + } + endKey, err := utils.ParseKey(cmd.Flags(), end) + if err != nil { + return err } cf, err := cmd.Flags().GetString("cf") @@ -359,6 +366,7 @@ func newRawRestoreCommand() *cobra.Command { }, } + command.Flags().StringP("format", "", "raw", "format of raw keys in arguments") command.Flags().StringP("start", "", "", "restore raw kv start key") command.Flags().StringP("end", "", "", "restore raw kv end key") command.Flags().StringP("cf", "", "default", "the cf to restore raw keys")