diff --git a/cmd/backup.go b/cmd/backup.go index 39aa4fd28..8ae45270c 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -15,7 +15,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error { if err := cfg.ParseFromFlags(command.Flags()); err != nil { return err } - return task.RunBackup(GetDefaultContext(), cmdName, &cfg) + return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg) } // NewBackupCommand return a full backup subcommand. diff --git a/cmd/cmd.go b/cmd/cmd.go index fdadaa6f8..83355e5dd 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/cobra" "go.uber.org/zap" + "github.com/pingcap/br/pkg/gluetidb" "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) @@ -21,6 +22,7 @@ var ( initOnce = sync.Once{} defaultContext context.Context hasLogFile uint64 + tidbGlue = gluetidb.Glue{} ) const ( diff --git a/cmd/restore.go b/cmd/restore.go index 2dfec9846..9f7c47bdb 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -14,7 +14,7 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error { if err := cfg.ParseFromFlags(command.Flags()); err != nil { return err } - return task.RunRestore(GetDefaultContext(), cmdName, &cfg) + return task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg) } // NewRestoreCommand returns a restore subcommand diff --git a/pkg/backup/safe_point_test.go b/pkg/backup/safe_point_test.go index 1bea9e210..5a4939191 100644 --- a/pkg/backup/safe_point_test.go +++ b/pkg/backup/safe_point_test.go @@ -8,18 +8,18 @@ import ( pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/util/testleak" - "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/mock" ) var _ = Suite(&testSaftPointSuite{}) type testSaftPointSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testSaftPointSuite) SetUpSuite(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } diff --git a/pkg/backup/schema_test.go b/pkg/backup/schema_test.go index f657310bf..a1514ba4a 100644 --- a/pkg/backup/schema_test.go +++ b/pkg/backup/schema_test.go @@ -9,18 +9,18 @@ import ( "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" - "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/mock" ) var _ = Suite(&testBackupSchemaSuite{}) type testBackupSchemaSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testBackupSchemaSuite) SetUpSuite(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } @@ -77,7 +77,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { <-updateCh c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 1) - // MockCluster returns a dummy checksum (all fields are 1). + // Cluster returns a dummy checksum (all fields are 1). c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0])) c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0])) c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0])) @@ -97,7 +97,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { <-updateCh c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 2) - // MockCluster returns a dummy checksum (all fields are 1). + // Cluster returns a dummy checksum (all fields are 1). c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0])) c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0])) c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0])) diff --git a/pkg/checksum/executor_test.go b/pkg/checksum/executor_test.go index 3e6d8078c..e9db6267b 100644 --- a/pkg/checksum/executor_test.go +++ b/pkg/checksum/executor_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tipb/go-tipb" + "github.com/pingcap/br/pkg/mock" "github.com/pingcap/br/pkg/utils" ) @@ -22,12 +23,12 @@ func TestT(t *testing.T) { var _ = Suite(&testChecksumSuite{}) type testChecksumSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testChecksumSuite) SetUpSuite(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } @@ -61,7 +62,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) { c.Assert(len(exe1.reqs), Equals, 1) resp, err := exe1.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {}) c.Assert(err, IsNil) - // MockCluster returns a dummy checksum (all fields are 1). + // Cluster returns a dummy checksum (all fields are 1). c.Assert(resp.Checksum, Equals, uint64(1), Commentf("%v", resp)) c.Assert(resp.TotalKvs, Equals, uint64(1), Commentf("%v", resp)) c.Assert(resp.TotalBytes, Equals, uint64(1), Commentf("%v", resp)) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 3695a2a0c..6ef1e87eb 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -20,13 +20,14 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/keepalive" + + "github.com/pingcap/br/pkg/glue" ) const ( @@ -87,7 +88,7 @@ func pdRequest( } // NewMgr creates a new Mgr. -func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, error) { +func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Storage) (*Mgr, error) { addrs := strings.Split(pdAddrs, ",") failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs) @@ -130,7 +131,7 @@ func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, er return nil, errors.Errorf("tikv cluster not health %+v", stores) } - dom, err := session.BootstrapSession(storage) + dom, err := g.BootstrapSession(storage) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/glue/glue.go b/pkg/glue/glue.go new file mode 100644 index 000000000..bc6fb214e --- /dev/null +++ b/pkg/glue/glue.go @@ -0,0 +1,24 @@ +package glue + +import ( + "context" + + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" +) + +// Glue is an abstraction of TiDB function calls used in BR. +type Glue interface { + BootstrapSession(store kv.Storage) (*domain.Domain, error) + CreateSession(store kv.Storage) (Session, error) +} + +// Session is an abstraction of the session.Session interface. +type Session interface { + Execute(ctx context.Context, sql string) error + ShowCreateDatabase(schema *model.DBInfo) (string, error) + ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) + Close() +} diff --git a/pkg/gluetidb/glue.go b/pkg/gluetidb/glue.go new file mode 100644 index 000000000..6b9f2f667 --- /dev/null +++ b/pkg/gluetidb/glue.go @@ -0,0 +1,65 @@ +package gluetidb + +import ( + "bytes" + "context" + + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/session" + + "github.com/pingcap/br/pkg/glue" +) + +// Glue is an implementation of glue.Glue using a new TiDB session. +type Glue struct{} + +type tidbSession struct { + se session.Session +} + +// BootstrapSession implements glue.Glue +func (Glue) BootstrapSession(store kv.Storage) (*domain.Domain, error) { + return session.BootstrapSession(store) +} + +// CreateSession implements glue.Glue +func (Glue) CreateSession(store kv.Storage) (glue.Session, error) { + se, err := session.CreateSession(store) + if err != nil { + return nil, err + } + return &tidbSession{se: se}, nil +} + +// Execute implements glue.Session +func (gs *tidbSession) Execute(ctx context.Context, sql string) error { + _, err := gs.se.Execute(ctx, sql) + return err +} + +// ShowCreateDatabase implements glue.Session +func (gs *tidbSession) ShowCreateDatabase(schema *model.DBInfo) (string, error) { + var buf bytes.Buffer + if err := executor.ConstructResultOfShowCreateDatabase(gs.se, schema, true, &buf); err != nil { + return "", err + } + return buf.String(), nil +} + +// ShowCreateTable implements glue.Session +func (gs *tidbSession) ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) { + var buf bytes.Buffer + if err := executor.ConstructResultOfShowCreateTable(gs.se, table, allocator, &buf); err != nil { + return "", err + } + return buf.String(), nil +} + +// Close implements glue.Session +func (gs *tidbSession) Close() { + gs.se.Close() +} diff --git a/pkg/utils/mock_cluster.go b/pkg/mock/mock_cluster.go similarity index 93% rename from pkg/utils/mock_cluster.go rename to pkg/mock/mock_cluster.go index dc7b87c3c..aee9666ed 100644 --- a/pkg/utils/mock_cluster.go +++ b/pkg/mock/mock_cluster.go @@ -1,4 +1,4 @@ -package utils +package mock import ( "database/sql" @@ -28,8 +28,8 @@ import ( var pprofOnce sync.Once -// MockCluster is mock tidb cluster, includes tikv and pd. -type MockCluster struct { +// Cluster is mock tidb cluster, includes tikv and pd. +type Cluster struct { *server.Server *mocktikv.Cluster mocktikv.MVCCStore @@ -40,8 +40,8 @@ type MockCluster struct { PDClient pd.Client } -// NewMockCluster create a new mock cluster. -func NewMockCluster() (*MockCluster, error) { +// NewCluster create a new mock cluster. +func NewCluster() (*Cluster, error) { pprofOnce.Do(func() { go func() { // Make sure pprof is registered. @@ -72,7 +72,7 @@ func NewMockCluster() (*MockCluster, error) { if err != nil { return nil, err } - return &MockCluster{ + return &Cluster{ Cluster: cluster, MVCCStore: mvccStore, Storage: storage, @@ -82,7 +82,7 @@ func NewMockCluster() (*MockCluster, error) { } // Start runs a mock cluster -func (mock *MockCluster) Start() error { +func (mock *Cluster) Start() error { statusURL, err := url.Parse(tempurl.Alloc()) if err != nil { return err @@ -124,7 +124,7 @@ func (mock *MockCluster) Start() error { } // Stop stops a mock cluster -func (mock *MockCluster) Stop() { +func (mock *Cluster) Stop() { if mock.Domain != nil { mock.Domain.Close() } diff --git a/pkg/mock/mock_cluster_test.go b/pkg/mock/mock_cluster_test.go new file mode 100644 index 000000000..e7ffc6e85 --- /dev/null +++ b/pkg/mock/mock_cluster_test.go @@ -0,0 +1,27 @@ +package mock + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/testleak" +) + +var _ = Suite(&testClusterSuite{}) + +type testClusterSuite struct { + mock *Cluster +} + +func (s *testClusterSuite) SetUpSuite(c *C) { + var err error + s.mock, err = NewCluster() + c.Assert(err, IsNil) +} + +func (s *testClusterSuite) TearDownSuite(c *C) { + testleak.AfterTest(c)() +} + +func (s *testClusterSuite) TestSmoke(c *C) { + c.Assert(s.mock.Start(), IsNil) + s.mock.Stop() +} diff --git a/pkg/restore/backoff_test.go b/pkg/restore/backoff_test.go index 537f0980c..73161a9f6 100644 --- a/pkg/restore/backoff_test.go +++ b/pkg/restore/backoff_test.go @@ -7,18 +7,19 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/br/pkg/mock" "github.com/pingcap/br/pkg/utils" ) var _ = Suite(&testBackofferSuite{}) type testBackofferSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testBackofferSuite) SetUpSuite(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index f45b3d510..a06617084 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" ) @@ -53,11 +54,12 @@ type Client struct { // NewRestoreClient returns a new RestoreClient func NewRestoreClient( ctx context.Context, + g glue.Glue, pdClient pd.Client, store kv.Storage, ) (*Client, error) { ctx, cancel := context.WithCancel(ctx) - db, err := NewDB(store) + db, err := NewDB(g, store) if err != nil { cancel() return nil, errors.Trace(err) diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 3d608b3b9..b67bbcfd7 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -12,18 +12,20 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/br/pkg/gluetidb" + "github.com/pingcap/br/pkg/mock" "github.com/pingcap/br/pkg/utils" ) var _ = Suite(&testRestoreClientSuite{}) type testRestoreClientSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testRestoreClientSuite) SetUpTest(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } @@ -36,7 +38,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { defer s.mock.Stop() client := Client{} - db, err := NewDB(s.mock.Storage) + db, err := NewDB(gluetidb.Glue{}, s.mock.Storage) c.Assert(err, IsNil) client.db = db client.ctx = context.Background() @@ -93,7 +95,7 @@ func (s *testRestoreClientSuite) TestIsOnline(c *C) { defer s.mock.Stop() client := Client{} - db, err := NewDB(s.mock.Storage) + db, err := NewDB(gluetidb.Glue{}, s.mock.Storage) c.Assert(err, IsNil) client.db = db client.ctx = context.Background() diff --git a/pkg/restore/db.go b/pkg/restore/db.go index 8c09af16f..22a1a4794 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -1,7 +1,6 @@ package restore import ( - "bytes" "context" "fmt" "sort" @@ -10,27 +9,26 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/session" "go.uber.org/zap" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/utils" ) // DB is a TiDB instance, not thread-safe. type DB struct { - se session.Session + se glue.Session } // NewDB returns a new DB -func NewDB(store kv.Storage) (*DB, error) { - se, err := session.CreateSession(store) +func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { + se, err := g.CreateSession(store) if err != nil { return nil, errors.Trace(err) } // Set SQL mode to None for avoiding SQL compatibility problem - _, err = se.Execute(context.Background(), "set @@sql_mode=''") + err = se.Execute(context.Background(), "set @@sql_mode=''") if err != nil { return nil, errors.Trace(err) } @@ -44,7 +42,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { var err error if ddlJob.BinlogInfo.TableInfo != nil { switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) - _, err = db.se.Execute(ctx, switchDbSQL) + err = db.se.Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("query", switchDbSQL), @@ -53,7 +51,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } } - _, err = db.se.Execute(ctx, ddlJob.Query) + err = db.se.Execute(ctx, ddlJob.Query) if err != nil { log.Error("execute ddl query failed", zap.String("query", ddlJob.Query), @@ -66,14 +64,12 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - var buf bytes.Buffer - err := executor.ConstructResultOfShowCreateDatabase(db.se, schema, true, &buf) + createSQL, err := db.se.ShowCreateDatabase(schema) if err != nil { log.Error("build create database SQL failed", zap.Stringer("db", schema.Name), zap.Error(err)) return errors.Trace(err) } - createSQL := buf.String() - _, err = db.se.Execute(ctx, createSQL) + err = db.se.Execute(ctx, createSQL) if err != nil { log.Error("create database failed", zap.String("query", createSQL), zap.Error(err)) } @@ -82,9 +78,8 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { - var buf bytes.Buffer schema := table.Info - err := executor.ConstructResultOfShowCreateTable(db.se, schema, newIDAllocator(schema.AutoIncID), &buf) + createSQL, err := db.se.ShowCreateTable(schema, newIDAllocator(schema.AutoIncID)) if err != nil { log.Error( "build create table SQL failed", @@ -93,8 +88,8 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { zap.Error(err)) return errors.Trace(err) } - switchDbSQL := fmt.Sprintf("use %s;", table.Db.Name) - _, err = db.se.Execute(ctx, switchDbSQL) + switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(table.Db.Name.O)) + err = db.se.Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("SQL", switchDbSQL), @@ -102,13 +97,12 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { zap.Error(err)) return errors.Trace(err) } - createSQL := buf.String() // Insert `IF NOT EXISTS` statement to skip the created tables words := strings.SplitN(createSQL, " ", 3) if len(words) > 2 && strings.ToUpper(words[0]) == "CREATE" && strings.ToUpper(words[1]) == "TABLE" { createSQL = "CREATE TABLE IF NOT EXISTS " + words[2] } - _, err = db.se.Execute(ctx, createSQL) + err = db.se.Execute(ctx, createSQL) if err != nil { log.Error("create table failed", zap.String("SQL", createSQL), @@ -119,9 +113,9 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { } alterAutoIncIDSQL := fmt.Sprintf( "alter table %s auto_increment = %d", - escapeTableName(schema.Name), + utils.EncloseName(schema.Name.O), schema.AutoIncID) - _, err = db.se.Execute(ctx, alterAutoIncIDSQL) + err = db.se.Execute(ctx, alterAutoIncIDSQL) if err != nil { log.Error("alter AutoIncID failed", zap.String("query", alterAutoIncIDSQL), diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 0151b4da6..b1e9e947c 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -13,18 +13,20 @@ import ( "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/gluetidb" + "github.com/pingcap/br/pkg/mock" "github.com/pingcap/br/pkg/utils" ) var _ = Suite(&testRestoreSchemaSuite{}) type testRestoreSchemaSuite struct { - mock *utils.MockCluster + mock *mock.Cluster } func (s *testRestoreSchemaSuite) SetUpSuite(c *C) { var err error - s.mock, err = utils.NewMockCluster() + s.mock, err = mock.NewCluster() c.Assert(err, IsNil) c.Assert(s.mock.Start(), IsNil) } @@ -70,7 +72,7 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(autoIncID, Equals, uint64(globalAutoID)) // Alter AutoIncID to the next AutoIncID + 100 table.Info.AutoIncID = globalAutoID + 100 - db, err := NewDB(s.mock.Storage) + db, err := NewDB(gluetidb.Glue{}, s.mock.Storage) c.Assert(err, IsNil, Commentf("Error create DB")) tk.MustExec("drop database if exists test;") // Test empty collate value diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 63ee92969..64ccfab19 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -324,12 +324,3 @@ func encodeKeyPrefix(key []byte) []byte { encodedPrefix = append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...) } - -// escape the identifier for pretty-printing. -// For instance, the identifier "foo `bar`" will become "`foo ``bar```". -// The sqlMode controls whether to escape with backquotes (`) or double quotes -// (`"`) depending on whether mysql.ModeANSIQuotes is enabled. -func escapeTableName(cis model.CIStr) string { - quote := "`" - return quote + strings.Replace(cis.O, quote, quote+quote, -1) + quote -} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 240754517..ead4c2351 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -61,7 +62,7 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } // RunBackup starts a backup task inside the current goroutine. -func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { +func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig) error { ctx, cancel := context.WithCancel(c) defer cancel() @@ -73,7 +74,7 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { if err != nil { return err } - mgr, err := newMgr(ctx, cfg.PD) + mgr, err := newMgr(ctx, g, cfg.PD) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index 2433d94b9..c3f866492 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/pflag" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/utils" ) @@ -178,7 +179,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } // newMgr creates a new mgr at the given PD address. -func newMgr(ctx context.Context, pds []string) (*conn.Mgr, error) { +func newMgr(ctx context.Context, g glue.Glue, pds []string) (*conn.Mgr, error) { pdAddress := strings.Join(pds, ",") if len(pdAddress) == 0 { return nil, errors.New("pd address can not be empty") @@ -189,7 +190,7 @@ func newMgr(ctx context.Context, pds []string) (*conn.Mgr, error) { if err != nil { return nil, err } - return conn.NewMgr(ctx, pdAddress, store.(tikv.Storage)) + return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage)) } // GetStorage gets the storage backend from the config. diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 599dcb478..bb00d189d 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -55,17 +56,17 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { } // RunRestore starts a restore task inside the current goroutine. -func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { +func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := newMgr(ctx, cfg.PD) + mgr, err := newMgr(ctx, g, cfg.PD) if err != nil { return err } defer mgr.Close() - client, err := restore.NewRestoreClient(ctx, mgr.GetPDClient(), mgr.GetTiKV()) + client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV()) if err != nil { return err } diff --git a/pkg/utils/mock_cluster_test.go b/pkg/utils/mock_cluster_test.go deleted file mode 100644 index 42cacae9c..000000000 --- a/pkg/utils/mock_cluster_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package utils - -import ( - . "github.com/pingcap/check" - "github.com/pingcap/tidb/util/testleak" -) - -var _ = Suite(&testMockClusterSuite{}) - -type testMockClusterSuite struct { - mock *MockCluster -} - -func (s *testMockClusterSuite) SetUpSuite(c *C) { - var err error - s.mock, err = NewMockCluster() - c.Assert(err, IsNil) -} - -func (s *testMockClusterSuite) TearDownSuite(c *C) { - testleak.AfterTest(c)() -} - -func (s *testMockClusterSuite) TestSmoke(c *C) { - c.Assert(s.mock.Start(), IsNil) - s.mock.Stop() -} diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 0afe98e5b..e1aec1225 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -2,16 +2,13 @@ package utils import ( "bytes" - "context" "encoding/json" "strings" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/util/sqlexec" ) const ( @@ -106,36 +103,6 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) { return databases, nil } -// ResultSetToStringSlice changes the RecordSet to [][]string. port from tidb -func ResultSetToStringSlice(ctx context.Context, s session.Session, rs sqlexec.RecordSet) ([][]string, error) { - rows, err := session.GetRows4Test(ctx, s, rs) - if err != nil { - return nil, err - } - err = rs.Close() - if err != nil { - return nil, err - } - sRows := make([][]string, len(rows)) - for i := range rows { - row := rows[i] - iRow := make([]string, row.Len()) - for j := 0; j < row.Len(); j++ { - if row.IsNull(j) { - iRow[j] = "" - } else { - d := row.GetDatum(j, &rs.Fields()[j].Column.FieldType) - iRow[j], err = d.ToString() - if err != nil { - return nil, err - } - } - } - sRows[i] = iRow - } - return sRows, nil -} - // EncloseName formats name in sql func EncloseName(name string) string { return "`" + strings.ReplaceAll(name, "`", "``") + "`"