diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 15be96bf4..13236388b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -395,7 +396,7 @@ func (bc *Client) BackupRange( defer cancel() var allStores []*metapb.Store - allStores, err = bc.mgr.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash) if err != nil { return errors.Trace(err) } diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 66c68dc49..cdfc78168 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -92,6 +92,51 @@ func pdRequest( return r, nil } +// UnexpectedStoreBehavior is the action to do in GetAllTiKVStores when a +// non-TiKV store (e.g. TiFlash store) is found. +type UnexpectedStoreBehavior uint8 + +const ( + // ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is + // found to be a TiFlash node. + ErrorOnTiFlash UnexpectedStoreBehavior = 0 + // SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to + // be a TiFlash node. + SkipTiFlash UnexpectedStoreBehavior = 1 +) + +// GetAllTiKVStores returns all TiKV stores registered to the PD client. The +// stores must not be a tombstone and must never contain a label `engine=tiflash`. +func GetAllTiKVStores( + ctx context.Context, + pdClient pd.Client, + unexpectedStoreBehavior UnexpectedStoreBehavior, +) ([]*metapb.Store, error) { + // get all live stores. + stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return nil, err + } + + // filter out all stores which are TiFlash. + j := 0 +skipStore: + for _, store := range stores { + for _, label := range store.Labels { + if label.Key == "engine" && label.Value == "tiflash" { + if unexpectedStoreBehavior == SkipTiFlash { + continue skipStore + } + return nil, errors.Errorf( + "cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address) + } + } + stores[j] = store + j++ + } + return stores[:j], nil +} + // NewMgr creates a new Mgr. func NewMgr( ctx context.Context, @@ -99,7 +144,9 @@ func NewMgr( pdAddrs string, storage tikv.Storage, tlsConf *tls.Config, - securityOption pd.SecurityOption) (*Mgr, error) { + securityOption pd.SecurityOption, + unexpectedStoreBehavior UnexpectedStoreBehavior, +) (*Mgr, error) { addrs := strings.Split(pdAddrs, ",") failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs) @@ -143,7 +190,7 @@ func NewMgr( log.Info("new mgr", zap.String("pdAddrs", pdAddrs)) // Check live tikv. - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := GetAllTiKVStores(ctx, pdClient, unexpectedStoreBehavior) if err != nil { log.Error("fail to get store", zap.Error(err)) return nil, err diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index 9cbb963e7..572798e23 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -15,6 +15,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/statistics" "github.com/pingcap/tidb/util/codec" @@ -149,3 +150,90 @@ func (s *testClientSuite) TestRegionCount(c *C) { c.Assert(err, IsNil) c.Assert(resp, Equals, 2) } + +type fakePDClient struct { + pd.Client + stores []*metapb.Store +} + +func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { + return append([]*metapb.Store{}, fpdc.stores...), nil +} + +func (s *testClientSuite) TestGetAllTiKVStores(c *C) { + testCases := []struct { + stores []*metapb.Store + unexpectedStoreBehavior UnexpectedStoreBehavior + expectedStores map[uint64]int + expectedError string + }{ + { + stores: []*metapb.Store{ + {Id: 1}, + }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1}, + }, + { + stores: []*metapb.Store{ + {Id: 1}, + }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedStores: map[uint64]int{1: 1}, + }, + { + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1}, + }, + { + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedError: "cannot restore to a cluster with active TiFlash stores.*", + }, + { + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + {Id: 3}, + {Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}}, + {Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}}, + {Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}}, + }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1}, + }, + { + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + {Id: 3}, + {Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}}, + {Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}}, + {Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}}, + }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedError: "cannot restore to a cluster with active TiFlash stores.*", + }, + } + + for _, testCase := range testCases { + pdClient := fakePDClient{stores: testCase.stores} + stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.unexpectedStoreBehavior) + if len(testCase.expectedError) != 0 { + c.Assert(err, ErrorMatches, testCase.expectedError) + continue + } + foundStores := make(map[uint64]int) + for _, store := range stores { + foundStores[store.Id]++ + } + c.Assert(foundStores, DeepEquals, testCase.expectedStores) + } +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 93cc97567..914b94607 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -261,7 +262,7 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { func (rc *Client) setSpeedLimit() error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { - stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone()) + stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.ErrorOnTiFlash) if err != nil { return err } @@ -345,7 +346,7 @@ func (rc *Client) SwitchToNormalMode(ctx context.Context) error { } func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMode) error { - stores, err := rc.pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.ErrorOnTiFlash) if err != nil { return errors.Trace(err) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index ab22c9039..8d9613047 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -101,7 +102,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index aa980b1a0..a51e80e95 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/pflag" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" @@ -89,7 +90,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRaw if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index 80f5eb258..859c4206d 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -208,7 +208,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } // newMgr creates a new mgr at the given PD address. -func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) (*conn.Mgr, error) { +func newMgr( + ctx context.Context, + g glue.Glue, + pds []string, + tlsConfig TLSConfig, + unexpectedStoreBehavior conn.UnexpectedStoreBehavior, +) (*conn.Mgr, error) { var ( tlsConf *tls.Config err error @@ -234,7 +240,7 @@ func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) if err != nil { return nil, err } - return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption) + return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, unexpectedStoreBehavior) } // GetStorage gets the storage backend from the config. diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a02e49cf1..c443b4c0f 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -75,7 +75,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.ErrorOnTiFlash) if err != nil { return err }