Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunBackup(GetDefaultContext(), cmdName, &cfg)
return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
Expand Down
2 changes: 2 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/gluetidb"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)
Expand All @@ -21,6 +22,7 @@ var (
initOnce = sync.Once{}
defaultContext context.Context
hasLogFile uint64
tidbGlue = gluetidb.Glue{}
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunRestore(GetDefaultContext(), cmdName, &cfg)
return task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

// NewRestoreCommand returns a restore subcommand
Expand Down
6 changes: 3 additions & 3 deletions pkg/backup/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testSaftPointSuite{})

type testSaftPointSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testSaftPointSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testBackupSchemaSuite{})

type testBackupSchemaSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testBackupSchemaSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
<-updateCh
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 1)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0]))
Expand All @@ -97,7 +97,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
<-updateCh
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 2)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0]))
Expand Down
7 changes: 4 additions & 3 deletions pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -22,12 +23,12 @@ func TestT(t *testing.T) {
var _ = Suite(&testChecksumSuite{})

type testChecksumSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testChecksumSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -61,7 +62,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
c.Assert(len(exe1.reqs), Equals, 1)
resp, err := exe1.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {})
c.Assert(err, IsNil)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(resp.Checksum, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalKvs, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalBytes, Equals, uint64(1), Commentf("%v", resp))
Expand Down
7 changes: 4 additions & 3 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/glue"
)

const (
Expand Down Expand Up @@ -87,7 +88,7 @@ func pdRequest(
}

// NewMgr creates a new Mgr.
func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
Expand Down Expand Up @@ -130,7 +131,7 @@ func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, er
return nil, errors.Errorf("tikv cluster not health %+v", stores)
}

dom, err := session.BootstrapSession(storage)
dom, err := g.BootstrapSession(storage)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package glue

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
BootstrapSession(store kv.Storage) (*domain.Domain, error)
CreateSession(store kv.Storage) (Session, error)
}

// Session is an abstraction of the session.Session interface.
type Session interface {
Execute(ctx context.Context, sql string) error
ShowCreateDatabase(schema *model.DBInfo) (string, error)
ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error)
Close()
}
65 changes: 65 additions & 0 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package gluetidb

import (
"bytes"
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"

"github.com/pingcap/br/pkg/glue"
)

// Glue is an implementation of glue.Glue using a new TiDB session.
type Glue struct{}

type tidbSession struct {
se session.Session
}

// BootstrapSession implements glue.Glue
func (Glue) BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return session.BootstrapSession(store)
}

// CreateSession implements glue.Glue
func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
se, err := session.CreateSession(store)
if err != nil {
return nil, err
}
return &tidbSession{se: se}, nil
}

// Execute implements glue.Session
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
return err
}

// ShowCreateDatabase implements glue.Session
func (gs *tidbSession) ShowCreateDatabase(schema *model.DBInfo) (string, error) {
var buf bytes.Buffer
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, schema, true, &buf); err != nil {
return "", err
}
return buf.String(), nil
}

// ShowCreateTable implements glue.Session
func (gs *tidbSession) ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) {
var buf bytes.Buffer
if err := executor.ConstructResultOfShowCreateTable(gs.se, table, allocator, &buf); err != nil {
return "", err
}
return buf.String(), nil
}

// Close implements glue.Session
func (gs *tidbSession) Close() {
gs.se.Close()
}
16 changes: 8 additions & 8 deletions pkg/utils/mock_cluster.go → pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package mock

import (
"database/sql"
Expand Down Expand Up @@ -28,8 +28,8 @@ import (

var pprofOnce sync.Once

// MockCluster is mock tidb cluster, includes tikv and pd.
type MockCluster struct {
// Cluster is mock tidb cluster, includes tikv and pd.
type Cluster struct {
*server.Server
*mocktikv.Cluster
mocktikv.MVCCStore
Expand All @@ -40,8 +40,8 @@ type MockCluster struct {
PDClient pd.Client
}

// NewMockCluster create a new mock cluster.
func NewMockCluster() (*MockCluster, error) {
// NewCluster create a new mock cluster.
func NewCluster() (*Cluster, error) {
pprofOnce.Do(func() {
go func() {
// Make sure pprof is registered.
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewMockCluster() (*MockCluster, error) {
if err != nil {
return nil, err
}
return &MockCluster{
return &Cluster{
Cluster: cluster,
MVCCStore: mvccStore,
Storage: storage,
Expand All @@ -82,7 +82,7 @@ func NewMockCluster() (*MockCluster, error) {
}

// Start runs a mock cluster
func (mock *MockCluster) Start() error {
func (mock *Cluster) Start() error {
statusURL, err := url.Parse(tempurl.Alloc())
if err != nil {
return err
Expand Down Expand Up @@ -124,7 +124,7 @@ func (mock *MockCluster) Start() error {
}

// Stop stops a mock cluster
func (mock *MockCluster) Stop() {
func (mock *Cluster) Stop() {
if mock.Domain != nil {
mock.Domain.Close()
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/mock/mock_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package mock

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"
)

var _ = Suite(&testClusterSuite{})

type testClusterSuite struct {
mock *Cluster
}

func (s *testClusterSuite) SetUpSuite(c *C) {
var err error
s.mock, err = NewCluster()
c.Assert(err, IsNil)
}

func (s *testClusterSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testClusterSuite) TestSmoke(c *C) {
c.Assert(s.mock.Start(), IsNil)
s.mock.Stop()
}
5 changes: 3 additions & 2 deletions pkg/restore/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/utils"
)

var _ = Suite(&testBackofferSuite{})

type testBackofferSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testBackofferSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -53,11 +54,12 @@ type Client struct {
// NewRestoreClient returns a new RestoreClient
func NewRestoreClient(
ctx context.Context,
g glue.Glue,
pdClient pd.Client,
store kv.Storage,
) (*Client, error) {
ctx, cancel := context.WithCancel(ctx)
db, err := NewDB(store)
db, err := NewDB(g, store)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down
Loading