From f8c54cbcd12b2feec5663160b7190b86f344b446 Mon Sep 17 00:00:00 2001 From: kennytm Date: Wed, 11 Mar 2020 18:27:31 +0800 Subject: [PATCH 1/3] conn: ignore nodes with label engine=tiflash --- pkg/backup/client.go | 3 ++- pkg/conn/conn.go | 26 +++++++++++++++++++- pkg/conn/conn_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++ pkg/restore/client.go | 5 ++-- 4 files changed, 85 insertions(+), 4 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index aa16d072f..c0cfecfd7 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -380,7 +381,7 @@ func (bc *Client) BackupRange( defer cancel() var allStores []*metapb.Store - allStores, err = bc.mgr.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient()) if err != nil { return errors.Trace(err) } diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 4e38a0499..92c351e50 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -92,6 +92,30 @@ func pdRequest( return r, nil } +// GetAllTiKVStores returns all TiKV stores registered to the PD client. The +// stores must not be a tombstone and must never contain a label `engine=tiflash`. +func GetAllTiKVStores(ctx context.Context, pdClient pd.Client) ([]*metapb.Store, error) { + // get all live stores. + stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return nil, err + } + + // filter out all stores which are TiFlash. + j := 0 +skipStore: + for _, store := range stores { + for _, label := range store.Labels { + if label.Key == "engine" && label.Value == "tiflash" { + continue skipStore + } + } + stores[j] = store + j++ + } + return stores[:j], nil +} + // NewMgr creates a new Mgr. func NewMgr( ctx context.Context, @@ -143,7 +167,7 @@ func NewMgr( log.Info("new mgr", zap.String("pdAddrs", pdAddrs)) // Check live tikv. - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := GetAllTiKVStores(ctx, pdClient) if err != nil { log.Error("fail to get store", zap.Error(err)) return nil, err diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index c120697dd..f4a775303 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -15,6 +15,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + pd "github.com/pingcap/pd/client" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/statistics" "github.com/pingcap/tidb/util/codec" @@ -149,3 +150,57 @@ func (s *testClientSuite) TestRegionCount(c *C) { c.Assert(err, IsNil) c.Assert(resp, Equals, 2) } + +type fakePDClient struct { + pd.Client +} + +func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { + return []*metapb.Store{ + { + Id: 1, + }, + { + Id: 2, + Labels: []*metapb.StoreLabel{ + {Key: "engine", Value: "tiflash"}, + }, + }, + { + Id: 3, + Labels: []*metapb.StoreLabel{ + {Key: "engine", Value: "tikv"}, + }, + }, + { + Id: 4, + Labels: []*metapb.StoreLabel{ + {Key: "engine", Value: "tiflash"}, + {Key: "else", Value: "tiflash"}, + }, + }, + { + Id: 5, + Labels: []*metapb.StoreLabel{ + {Key: "else", Value: "tiflash"}, + {Key: "engine", Value: "tikv"}, + }, + }, + }, nil +} + +func (s *testClientSuite) TestGetAllTiKVStores(c *C) { + var pdClient fakePDClient + stores, err := GetAllTiKVStores(context.Background(), pdClient) + c.Assert(err, IsNil) + + foundStores := make(map[uint64]int) + for _, store := range stores { + foundStores[store.Id]++ + } + c.Assert(foundStores, DeepEquals, map[uint64]int{ + 1: 1, + 3: 1, + 5: 1, + }) +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 38cf2d5a6..07be7fbde 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -250,7 +251,7 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { func (rc *Client) setSpeedLimit() error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { - stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone()) + stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient) if err != nil { return err } @@ -334,7 +335,7 @@ func (rc *Client) SwitchToNormalMode(ctx context.Context) error { } func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMode) error { - stores, err := rc.pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient) if err != nil { return errors.Trace(err) } From 92d5b1612262633e55720b166de8d5784ca70661 Mon Sep 17 00:00:00 2001 From: kennytm Date: Wed, 11 Mar 2020 21:53:27 +0800 Subject: [PATCH 2/3] conn: disallow TiFlash on restore, only skip TiFlash on backup --- pkg/backup/client.go | 2 +- pkg/conn/conn.go | 30 +++++++++++-- pkg/conn/conn_test.go | 95 ++++++++++++++++++++++++++++-------------- pkg/restore/client.go | 4 +- pkg/task/backup.go | 3 +- pkg/task/backup_raw.go | 3 +- pkg/task/common.go | 10 ++++- pkg/task/restore.go | 2 +- 8 files changed, 106 insertions(+), 43 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index c0cfecfd7..de97feb70 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -381,7 +381,7 @@ func (bc *Client) BackupRange( defer cancel() var allStores []*metapb.Store - allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient()) + allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash) if err != nil { return errors.Trace(err) } diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 92c351e50..60f1b56dc 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -92,9 +92,26 @@ func pdRequest( return r, nil } +// UnexpectedStoreBehavior is the action to do in GetAllTiKVStores when a +// non-TiKV store (e.g. TiFlash store) is found. +type UnexpectedStoreBehavior uint8 + +const ( + // ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is + // found to be a TiFlash node. + ErrorOnTiFlash UnexpectedStoreBehavior = 0 + // SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to + // be a TiFlash node. + SkipTiFlash UnexpectedStoreBehavior = 1 +) + // GetAllTiKVStores returns all TiKV stores registered to the PD client. The // stores must not be a tombstone and must never contain a label `engine=tiflash`. -func GetAllTiKVStores(ctx context.Context, pdClient pd.Client) ([]*metapb.Store, error) { +func GetAllTiKVStores( + ctx context.Context, + pdClient pd.Client, + unexpectedStoreBehavior UnexpectedStoreBehavior, +) ([]*metapb.Store, error) { // get all live stores. stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { @@ -107,7 +124,10 @@ skipStore: for _, store := range stores { for _, label := range store.Labels { if label.Key == "engine" && label.Value == "tiflash" { - continue skipStore + if unexpectedStoreBehavior == SkipTiFlash { + continue skipStore + } + return nil, errors.Errorf("cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address) } } stores[j] = store @@ -123,7 +143,9 @@ func NewMgr( pdAddrs string, storage tikv.Storage, tlsConf *tls.Config, - securityOption pd.SecurityOption) (*Mgr, error) { + securityOption pd.SecurityOption, + unexpectedStoreBehavior UnexpectedStoreBehavior, +) (*Mgr, error) { addrs := strings.Split(pdAddrs, ",") failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs) @@ -167,7 +189,7 @@ func NewMgr( log.Info("new mgr", zap.String("pdAddrs", pdAddrs)) // Check live tikv. - stores, err := GetAllTiKVStores(ctx, pdClient) + stores, err := GetAllTiKVStores(ctx, pdClient, unexpectedStoreBehavior) if err != nil { log.Error("fail to get store", zap.Error(err)) return nil, err diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index f4a775303..7e918d91b 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -153,54 +153,87 @@ func (s *testClientSuite) TestRegionCount(c *C) { type fakePDClient struct { pd.Client + stores []*metapb.Store } func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { - return []*metapb.Store{ + return append([]*metapb.Store{}, fpdc.stores...), nil +} + +func (s *testClientSuite) TestGetAllTiKVStores(c *C) { + testCases := []struct { + stores []*metapb.Store + unexpectedStoreBehavior UnexpectedStoreBehavior + expectedStores map[uint64]int + expectedError string + }{ { - Id: 1, + stores: []*metapb.Store{ + {Id: 1}, + }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1}, }, { - Id: 2, - Labels: []*metapb.StoreLabel{ - {Key: "engine", Value: "tiflash"}, + stores: []*metapb.Store{ + {Id: 1}, }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedStores: map[uint64]int{1: 1}, }, { - Id: 3, - Labels: []*metapb.StoreLabel{ - {Key: "engine", Value: "tikv"}, + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1}, }, { - Id: 4, - Labels: []*metapb.StoreLabel{ - {Key: "engine", Value: "tiflash"}, - {Key: "else", Value: "tiflash"}, + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedError: "cannot restore to a cluster with active TiFlash stores.*", }, { - Id: 5, - Labels: []*metapb.StoreLabel{ - {Key: "else", Value: "tiflash"}, - {Key: "engine", Value: "tikv"}, + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + {Id: 3}, + {Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}}, + {Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}}, + {Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}}, }, + unexpectedStoreBehavior: SkipTiFlash, + expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1}, }, - }, nil -} - -func (s *testClientSuite) TestGetAllTiKVStores(c *C) { - var pdClient fakePDClient - stores, err := GetAllTiKVStores(context.Background(), pdClient) - c.Assert(err, IsNil) + { + stores: []*metapb.Store{ + {Id: 1}, + {Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}}, + {Id: 3}, + {Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}}, + {Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}}, + {Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}}, + }, + unexpectedStoreBehavior: ErrorOnTiFlash, + expectedError: "cannot restore to a cluster with active TiFlash stores.*", + }, + } - foundStores := make(map[uint64]int) - for _, store := range stores { - foundStores[store.Id]++ + for _, testCase := range testCases { + pdClient := fakePDClient{stores: testCase.stores} + stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.unexpectedStoreBehavior) + if len(testCase.expectedError) != 0 { + c.Assert(err, ErrorMatches, testCase.expectedError) + continue + } + foundStores := make(map[uint64]int) + for _, store := range stores { + foundStores[store.Id]++ + } + c.Assert(foundStores, DeepEquals, testCase.expectedStores) } - c.Assert(foundStores, DeepEquals, map[uint64]int{ - 1: 1, - 3: 1, - 5: 1, - }) } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 07be7fbde..1d90268f8 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -251,7 +251,7 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { func (rc *Client) setSpeedLimit() error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { - stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient) + stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.ErrorOnTiFlash) if err != nil { return err } @@ -335,7 +335,7 @@ func (rc *Client) SwitchToNormalMode(ctx context.Context) error { } func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMode) error { - stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient) + stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.ErrorOnTiFlash) if err != nil { return errors.Trace(err) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b4ece838d..571492dce 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -82,7 +83,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index aa980b1a0..a51e80e95 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/pflag" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" @@ -89,7 +90,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRaw if err != nil { return err } - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index 57134c60a..a3cbf726a 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -208,7 +208,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } // newMgr creates a new mgr at the given PD address. -func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) (*conn.Mgr, error) { +func newMgr( + ctx context.Context, + g glue.Glue, + pds []string, + tlsConfig TLSConfig, + unexpectedStoreBehavior conn.UnexpectedStoreBehavior, +) (*conn.Mgr, error) { var ( tlsConf *tls.Config err error @@ -234,7 +240,7 @@ func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) if err != nil { return nil, err } - return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption) + return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, unexpectedStoreBehavior) } // GetStorage gets the storage backend from the config. diff --git a/pkg/task/restore.go b/pkg/task/restore.go index f8333d7ff..56e22a5a3 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -75,7 +75,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.ErrorOnTiFlash) if err != nil { return err } From 8db3fa0439848fe99ced9895b146da5777142793 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 12 Mar 2020 09:42:14 +0800 Subject: [PATCH 3/3] conn: fix lint --- pkg/conn/conn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 60f1b56dc..6dc7c4d10 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -127,7 +127,8 @@ skipStore: if unexpectedStoreBehavior == SkipTiFlash { continue skipStore } - return nil, errors.Errorf("cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address) + return nil, errors.Errorf( + "cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address) } } stores[j] = store