Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (bc *Client) BackupRange(
defer cancel()

var allStores []*metapb.Store
allStores, err = bc.mgr.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
51 changes: 49 additions & 2 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,61 @@ func pdRequest(
return r, nil
}

// UnexpectedStoreBehavior is the action to do in GetAllTiKVStores when a
// non-TiKV store (e.g. TiFlash store) is found.
type UnexpectedStoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash UnexpectedStoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash UnexpectedStoreBehavior = 1
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
unexpectedStoreBehavior UnexpectedStoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}

// filter out all stores which are TiFlash.
j := 0
skipStore:
for _, store := range stores {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
if unexpectedStoreBehavior == SkipTiFlash {
continue skipStore
}
return nil, errors.Errorf(
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
}
stores[j] = store
j++
}
return stores[:j], nil
}

// NewMgr creates a new Mgr.
func NewMgr(
ctx context.Context,
g glue.Glue,
pdAddrs string,
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption) (*Mgr, error) {
securityOption pd.SecurityOption,
unexpectedStoreBehavior UnexpectedStoreBehavior,
) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
Expand Down Expand Up @@ -143,7 +190,7 @@ func NewMgr(
log.Info("new mgr", zap.String("pdAddrs", pdAddrs))

// Check live tikv.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := GetAllTiKVStores(ctx, pdClient, unexpectedStoreBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return nil, err
Expand Down
88 changes: 88 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/statistics"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -149,3 +150,90 @@ func (s *testClientSuite) TestRegionCount(c *C) {
c.Assert(err, IsNil)
c.Assert(resp, Equals, 2)
}

type fakePDClient struct {
pd.Client
stores []*metapb.Store
}

func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (s *testClientSuite) TestGetAllTiKVStores(c *C) {
testCases := []struct {
stores []*metapb.Store
unexpectedStoreBehavior UnexpectedStoreBehavior
expectedStores map[uint64]int
expectedError string
}{
{
stores: []*metapb.Store{
{Id: 1},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
{Id: 3},
{Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}},
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
{Id: 3},
{Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}},
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
}

for _, testCase := range testCases {
pdClient := fakePDClient{stores: testCase.stores}
stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.unexpectedStoreBehavior)
if len(testCase.expectedError) != 0 {
c.Assert(err, ErrorMatches, testCase.expectedError)
continue
}
foundStores := make(map[uint64]int)
for _, store := range stores {
foundStores[store.Id]++
}
c.Assert(foundStores, DeepEquals, testCase.expectedStores)
}
}
5 changes: 3 additions & 2 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.ErrorOnTiFlash)
if err != nil {
return err
}
Expand Down Expand Up @@ -345,7 +346,7 @@ func (rc *Client) SwitchToNormalMode(ctx context.Context) error {
}

func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMode) error {
stores, err := rc.pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.ErrorOnTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -101,7 +102,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/spf13/pflag"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
Expand Down Expand Up @@ -89,7 +90,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRaw
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}

// newMgr creates a new mgr at the given PD address.
func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) (*conn.Mgr, error) {
func newMgr(
ctx context.Context,
g glue.Glue,
pds []string,
tlsConfig TLSConfig,
unexpectedStoreBehavior conn.UnexpectedStoreBehavior,
) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
err error
Expand All @@ -234,7 +240,7 @@ func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig)
if err != nil {
return nil, err
}
return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption)
return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, unexpectedStoreBehavior)
}

// GetStorage gets the storage backend from the config.
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.ErrorOnTiFlash)
if err != nil {
return err
}
Expand Down