From e3ce68d26a883a03486ab78e7d1c329fb18fa7f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 3 Sep 2020 02:49:47 +0800 Subject: [PATCH] *: cherry-pick #483 Signed-off-by: Hillium --- cmd/restore.go | 2 +- pkg/restore/client.go | 12 ++------ pkg/restore/import.go | 19 ------------ pkg/restore/pipeline_items.go | 27 ++++------------- pkg/restore/util.go | 56 ----------------------------------- pkg/task/backup.go | 3 +- pkg/task/backup_raw.go | 3 +- pkg/task/common.go | 25 +++++++--------- pkg/task/restore.go | 36 +++------------------- pkg/task/restore_raw.go | 3 +- pkg/utils/pd.go | 12 ++++++++ pkg/utils/version.go | 36 ++++++++++++++++++++++ pkg/utils/version_test.go | 33 +++++++++++++++++++++ 13 files changed, 108 insertions(+), 159 deletions(-) diff --git a/cmd/restore.go b/cmd/restore.go index 5d0ddb3c3..2eea2aed9 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -127,7 +127,7 @@ func newTableRestoreCommand() *cobra.Command { func newTiflashReplicaRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "tiflash-replica", - Short: "restore the tiflash replica before the last restore, it must only be used after the last restore failed", + Short: "restore the tiflash replica removed by a failed restore of the older version BR", RunE: func(cmd *cobra.Command, _ []string) error { return runRestoreTiflashReplicaCommand(cmd, "Restore TiFlash Replica") }, diff --git a/pkg/restore/client.go b/pkg/restore/client.go index b6af2b758..c12771267 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -9,7 +9,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "math" "sort" "strconv" "time" @@ -318,10 +317,7 @@ func (rc *Client) GetTableSchema( dbName model.CIStr, tableName model.CIStr, ) (*model.TableInfo, error) { - info, err := dom.GetSnapshotInfoSchema(math.MaxInt64) - if err != nil { - return nil, errors.Trace(err) - } + info := dom.InfoSchema() table, err := info.TableByName(dbName, tableName) if err != nil { return nil, errors.Trace(err) @@ -626,7 +622,6 @@ func (rc *Client) setSpeedLimit() error { func (rc *Client) RestoreFiles( files []*backup.File, rewriteRules *RewriteRules, - rejectStoreMap map[uint64]bool, updateCh glue.Progress, ) (err error) { start := time.Now() @@ -653,7 +648,7 @@ func (rc *Client) RestoreFiles( rc.workerPool.ApplyOnErrorGroup(eg, func() error { defer updateCh.Inc() - return rc.fileImporter.Import(ectx, fileReplica, rejectStoreMap, rewriteRules) + return rc.fileImporter.Import(ectx, fileReplica, rewriteRules) }) } if err := eg.Wait(); err != nil { @@ -686,13 +681,12 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil return errors.Trace(err) } - emptyRules := &RewriteRules{} for _, file := range files { fileReplica := file rc.workerPool.ApplyOnErrorGroup(eg, func() error { defer updateCh.Inc() - return rc.fileImporter.Import(ectx, fileReplica, nil, emptyRules) + return rc.fileImporter.Import(ectx, fileReplica, EmptyRewriteRule()) }) } if err := eg.Wait(); err != nil { diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 662014b83..4b2fec8e4 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -176,7 +176,6 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { func (importer *FileImporter) Import( ctx context.Context, file *backup.File, - rejectStoreMap map[uint64]bool, rewriteRules *RewriteRules, ) error { log.Debug("import file", utils.ZapFile(file)) @@ -201,8 +200,6 @@ func (importer *FileImporter) Import( zap.Stringer("startKey", utils.WrapKey(startKey)), zap.Stringer("endKey", utils.WrapKey(endKey))) - needReject := len(rejectStoreMap) > 0 - err = utils.WithRetry(ctx, func() error { tctx, cancel := context.WithTimeout(ctx, importScanRegionTime) defer cancel() @@ -213,22 +210,6 @@ func (importer *FileImporter) Import( return errors.Trace(errScanRegion) } - if needReject { - // TODO remove when TiFlash support restore - startTime := time.Now() - log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) - for _, region := range regionInfos { - if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { - log.Error("waiting for removing rejected stores failed", - utils.ZapRegion(region.Region)) - return errors.New("waiting for removing rejected stores failed") - } - } - log.Info("waiting for removing rejected stores done", - zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime))) - needReject = false - } - log.Debug("scan regions", utils.ZapFile(file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region regionLoop: diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index d99f5bd8e..83242cb2b 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -11,7 +11,6 @@ import ( "github.com/pingcap/parser/model" "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/utils" @@ -135,9 +134,8 @@ type BatchSender interface { } type tikvSender struct { - client *Client - updateCh glue.Progress - rejectStoreMap map[uint64]bool + client *Client + updateCh glue.Progress } // NewTiKVSender make a sender that send restore requests to TiKV. @@ -145,25 +143,10 @@ func NewTiKVSender( ctx context.Context, cli *Client, updateCh glue.Progress, - // TODO remove this field after we support TiFlash. - removeTiFlash bool, ) (BatchSender, error) { - rejectStoreMap := make(map[uint64]bool) - if removeTiFlash { - tiflashStores, err := conn.GetAllTiKVStores(ctx, cli.GetPDClient(), conn.TiFlashOnly) - if err != nil { - log.Error("failed to get and remove TiFlash replicas", zap.Error(err)) - return nil, err - } - for _, store := range tiflashStores { - rejectStoreMap[store.GetId()] = true - } - } - return &tikvSender{ - client: cli, - updateCh: updateCh, - rejectStoreMap: rejectStoreMap, + client: cli, + updateCh: updateCh, }, nil } @@ -181,7 +164,7 @@ func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rew files = append(files, fs.Files...) } - if err := b.client.RestoreFiles(files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil { + if err := b.client.RestoreFiles(files, rewriteRules, b.updateCh); err != nil { return err } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index cfb991459..2a239f133 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -497,62 +497,6 @@ func PaginateScanRegion( return regions, nil } -func hasRejectStorePeer( - ctx context.Context, - client SplitClient, - regionID uint64, - rejectStores map[uint64]bool, -) (bool, error) { - regionInfo, err := client.GetRegionByID(ctx, regionID) - if err != nil { - return false, err - } - if regionInfo == nil { - return false, nil - } - for _, peer := range regionInfo.Region.GetPeers() { - if rejectStores[peer.GetStoreId()] { - return true, nil - } - } - retryTimes := ctx.Value(retryTimes).(int) - if retryTimes > 10 { - log.Warn("get region info", utils.ZapRegion(regionInfo.Region)) - } - return false, nil -} - -func waitForRemoveRejectStores( - ctx context.Context, - client SplitClient, - regionInfo *RegionInfo, - rejectStores map[uint64]bool, -) bool { - interval := RejectStoreCheckInterval - regionID := regionInfo.Region.GetId() - for i := 0; i < RejectStoreCheckRetryTimes; i++ { - ctx1 := context.WithValue(ctx, retryTimes, i) - ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores) - if err != nil { - log.Warn("wait for rejecting store failed", - utils.ZapRegion(regionInfo.Region), - zap.Error(err)) - return false - } - // Do not have any peer in the rejected store, return true - if !ok { - return true - } - interval = 2 * interval - if interval > RejectStoreMaxCheckInterval { - interval = RejectStoreMaxCheckInterval - } - time.Sleep(interval) - } - - return false -} - // ZapTables make zap field of table for debuging, including table names. func ZapTables(tables []CreatedTable) zapcore.Field { tableNames := make([]string, 0, len(tables)) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 7083c229a..0f04cc2fa 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" - "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -176,7 +175,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 20f8f9ed2..fa6a15e72 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -14,7 +14,6 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" - "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" @@ -126,7 +125,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index a61cfde5c..c931501f9 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -110,7 +110,6 @@ type Config struct { Filter filter.MySQLReplicationRules TableFilter filter.Filter `json:"-" toml:"-"` - RemoveTiFlash bool `json:"remove-tiflash" toml:"remove-tiflash"` CheckRequirements bool `json:"check-requirements" toml:"check-requirements"` SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"` } @@ -137,6 +136,8 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Uint64(flagRateLimitUnit, utils.MB, "The unit of rate limit") _ = flags.MarkHidden(flagRateLimitUnit) + _ = flags.MarkDeprecated(flagRemoveTiFlash, + "TiFlash is fully supported by BR now, removing TiFlash isn't needed any more. This flag would be ignored.") flags.Bool(flagCheckRequirement, true, "Whether start version check before execute command") @@ -221,11 +222,6 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.RateLimit = rateLimit * rateLimitUnit - cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash) - if err != nil { - return errors.Trace(err) - } - var caseSensitive bool if filterFlag := flags.Lookup(flagFilter); filterFlag != nil { f, err := filter.Parse(filterFlag.Value.(pflag.SliceValue).GetSlice()) @@ -282,14 +278,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } // newMgr creates a new mgr at the given PD address. -func newMgr( - ctx context.Context, - g glue.Glue, - pds []string, +func newMgr(ctx context.Context, + g glue.Glue, pds []string, tlsConfig TLSConfig, - storeBehavior conn.StoreBehavior, - checkRequirements bool, -) (*conn.Mgr, error) { + checkRequirements bool) (*conn.Mgr, error) { var ( tlsConf *tls.Config err error @@ -315,7 +307,12 @@ func newMgr( if err != nil { return nil, err } - return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, storeBehavior, checkRequirements) + + // Is it necessary to remove `StoreBehavior`? + return conn.NewMgr(ctx, g, + pdAddress, store.(tikv.Storage), + tlsConf, securityOption, + conn.SkipTiFlash, checkRequirements) } // GetStorage gets the storage backend from the config. diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 4209d299c..3cb5e867b 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -61,10 +61,6 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash) - if err != nil { - return errors.Trace(err) - } err = cfg.Config.ParseFromFlags(flags) if err != nil { return errors.Trace(err) @@ -97,7 +93,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) if err != nil { return err } @@ -250,7 +246,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(rangeSize+len(files)+len(tables)), !cfg.LogProgress) defer updateCh.Close() - sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.RemoveTiFlash) + sender, err := restore.NewTiKVSender(ctx, client, updateCh) if err != nil { return err } @@ -258,7 +254,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) batcher.SetThreshold(batchSize) batcher.EnableAutoCommit(ctx, time.Second) - go restoreTableStream(ctx, rangeStream, cfg.RemoveTiFlash, cfg.PD, client, batcher, errCh) + go restoreTableStream(ctx, rangeStream, batcher, errCh) var finish <-chan struct{} // Checksum @@ -372,7 +368,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash, cfg.CheckRequirements) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) if err != nil { return err } @@ -439,10 +435,6 @@ func enableTiDBConfig() { func restoreTableStream( ctx context.Context, inputCh <-chan restore.TableWithRange, - // TODO: remove this field and rules field after we support TiFlash - removeTiFlashReplica bool, - pdAddr []string, - client *restore.Client, batcher *restore.Batcher, errCh chan<- error, ) { @@ -454,10 +446,6 @@ func restoreTableStream( log.Info("doing postwork", zap.Int("table count", len(oldTables)), ) - if err := client.RecoverTiFlashReplica(oldTables); err != nil { - log.Error("failed on recover TiFlash replicas", zap.Error(err)) - errCh <- err - } }() for { @@ -469,22 +457,6 @@ func restoreTableStream( if !ok { return } - if removeTiFlashReplica { - rules, err := client.GetPlacementRules(pdAddr) - if err != nil { - errCh <- err - return - } - log.Debug("get rules", zap.Any("rules", rules), zap.Strings("pd", pdAddr)) - log.Debug("try to remove tiflash of table", zap.Stringer("table name", t.Table.Name)) - tiFlashRep, err := client.RemoveTiFlashOfTable(t.CreatedTable, rules) - if err != nil { - log.Error("failed on remove TiFlash replicas", zap.Error(err)) - errCh <- err - return - } - t.OldTable.TiFlashReplicas = tiFlashRep - } oldTables = append(oldTables, t.OldTable) batcher.Add(t) diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 4bd867086..a1a6f0513 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -9,7 +9,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/summary" @@ -51,7 +50,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.ErrorOnTiFlash, cfg.CheckRequirements) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/utils/pd.go b/pkg/utils/pd.go index 1aadbdbcc..43c7b3547 100644 --- a/pkg/utils/pd.go +++ b/pkg/utils/pd.go @@ -13,6 +13,8 @@ import ( "strings" "time" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/errors" "github.com/pingcap/pd/v4/pkg/codec" "github.com/pingcap/pd/v4/server/schedule/placement" @@ -113,3 +115,13 @@ func SearchPlacementRule(tableID int64, placementRules []placement.Rule, role pl } return nil } + +// IsTiFlash tests whether the store is based on tiflash engine. +func IsTiFlash(store *metapb.Store) bool { + for _, label := range store.Labels { + if label.Key == "engine" && label.Value == "tiflash" { + return true + } + } + return false +} diff --git a/pkg/utils/version.go b/pkg/utils/version.go index f479454a3..961ad54fe 100644 --- a/pkg/utils/version.go +++ b/pkg/utils/version.go @@ -10,6 +10,8 @@ import ( "runtime" "strings" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -54,6 +56,8 @@ func BRInfo() string { var minTiKVVersion *semver.Version = semver.New("3.1.0-beta.2") var incompatibleTiKVMajor3 *semver.Version = semver.New("3.1.0") var incompatibleTiKVMajor4 *semver.Version = semver.New("4.0.0-rc.1") +var compatibleTiFlashMajor3 = semver.New("3.1.0") +var compatibleTiFlashMajor4 = semver.New("4.0.0") func removeVAndHash(v string) string { v = VersionHash.ReplaceAllLiteralString(v, "") @@ -61,6 +65,25 @@ func removeVAndHash(v string) string { return strings.TrimPrefix(v, "v") } +func checkTiFlashVersion(store *metapb.Store) error { + flash, err := semver.NewVersion(removeVAndHash(store.Version)) + if err != nil { + return errors.Annotatef(err, "failed to parse TiFlash@[%s] version %s", store.GetPeerAddress(), store.Version) + } + + if flash.Major == 3 && flash.LessThan(*compatibleTiFlashMajor3) { + return errors.Errorf("incompatible TiFlash@[%s] version %s, try update it to %s", + store.GetPeerAddress(), store.Version, compatibleTiFlashMajor3) + } + + if flash.Major == 4 && flash.LessThan(*compatibleTiFlashMajor4) { + return errors.Errorf("incompatible TiFlash@[%s] version %s, try update it to %s", + store.GetPeerAddress(), store.Version, compatibleTiFlashMajor4) + } + + return nil +} + // CheckClusterVersion check TiKV version. func CheckClusterVersion(ctx context.Context, client pd.Client) error { BRVersion, err := semver.NewVersion(removeVAndHash(BRReleaseVersion)) @@ -72,6 +95,19 @@ func CheckClusterVersion(ctx context.Context, client pd.Client) error { return errors.Trace(err) } for _, s := range stores { + isTiFlash := IsTiFlash(s) + log.Debug("checking compatibility of store in cluster", + zap.Uint64("ID", s.GetId()), + zap.Bool("TiFlash?", isTiFlash), + zap.String("address", s.GetAddress()), + zap.String("version", s.GetVersion()), + ) + if isTiFlash { + if err := checkTiFlashVersion(s); err != nil { + return err + } + } + tikvVersionString := removeVAndHash(s.Version) tikvVersion, err := semver.NewVersion(tikvVersionString) if err != nil { diff --git a/pkg/utils/version_test.go b/pkg/utils/version_test.go index 4a4e0b011..8062c768f 100644 --- a/pkg/utils/version_test.go +++ b/pkg/utils/version_test.go @@ -25,11 +25,44 @@ func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOpti return []*metapb.Store{}, nil } +func tiflash(version string) []*metapb.Store { + return []*metapb.Store{ + {Version: version, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + } +} + func (s *versionSuite) TestCheckClusterVersion(c *check.C) { mock := mockPDClient{ Client: nil, } + { + BRReleaseVersion = "v4.0.5" + mock.getAllStores = func() []*metapb.Store { + return tiflash("v4.0.0-rc.1") + } + err := CheckClusterVersion(context.Background(), &mock) + c.Assert(err, check.ErrorMatches, `incompatible.*version v4.0.0-rc.1, try update it to 4.0.0`) + } + + { + BRReleaseVersion = "v3.0.14" + mock.getAllStores = func() []*metapb.Store { + return tiflash("v3.1.0-beta.1") + } + err := CheckClusterVersion(context.Background(), &mock) + c.Assert(err, check.ErrorMatches, `incompatible.*version v3.1.0-beta.1, try update it to 3.1.0`) + } + + { + BRReleaseVersion = "v3.1.1" + mock.getAllStores = func() []*metapb.Store { + return tiflash("v3.0.15") + } + err := CheckClusterVersion(context.Background(), &mock) + c.Assert(err, check.ErrorMatches, `incompatible.*version v3.0.15, try update it to 3.1.0`) + } + { BRReleaseVersion = "v3.1.0-beta.2" mock.getAllStores = func() []*metapb.Store {