Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20200213042211-e357ed5f237b
github.com/pingcap/pd v1.1.0-beta.0.20200213133706-fbbe75e180e6
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,12 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20200213074014-83e827908584 h1:DhQfXNn9m36b2/4zUfPHDDR6CwS2VONbfPC4s+LMVj0=
github.com/pingcap/kvproto v0.0.0-20200213074014-83e827908584/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f h1:wi4TNMBfsgiMsOlTSHBq4JKFViabIA1W0d+owiLtp70=
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963 h1:QqIzAqo6C3rrekgIjIduCWUw7lEDnzHD/JKdMffwaR0=
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200213042211-e357ed5f237b h1:oKql7mOA71N7NxMn3MHtYcxntXrFxNPDMDalF/dW3iM=
Expand Down Expand Up @@ -585,6 +584,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M=
Expand Down Expand Up @@ -620,7 +620,6 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
Expand Down
55 changes: 53 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -119,15 +120,20 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
}
}
log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs)))
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
41 changes: 38 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package restore

import (
"context"
"encoding/json"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -40,6 +42,7 @@ type Client struct {
tableWorkerPool *utils.WorkerPool

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
db *DB
rateLimit uint64
Expand Down Expand Up @@ -97,8 +100,15 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
if err != nil {
return errors.Trace(err)
}
var ddlJobs []*model.Job
err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
rc.ddlJobs = ddlJobs
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
Expand Down Expand Up @@ -151,6 +161,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// GetDDLJobs returns ddl jobs
func (rc *Client) GetDDLJobs() []*model.Job {
return rc.ddlJobs
}

// GetTableSchema returns the schema of a table from TiDB.
func (rc *Client) GetTableSchema(
dom *domain.Domain,
Expand Down Expand Up @@ -189,18 +204,38 @@ func (rc *Client) CreateTables(
if err != nil {
return nil, nil, err
}
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name)
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name)
if err != nil {
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
return rewriteRules, newTables, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
sort.Slice(ddlJobs, func(i, j int) bool {
return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion
})

for _, job := range ddlJobs {
err := rc.db.ExecDDL(rc.ctx, job)
if err != nil {
return errors.Trace(err)
}
log.Info("execute ddl query",
zap.String("db", job.BinlogInfo.DBInfo.Name.String()),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
return nil
}

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
Expand Down Expand Up @@ -386,7 +421,7 @@ func (rc *Client) ValidateChecksum(
checksumResp.TotalBytes != table.TotalBytes {
log.Error("failed in validate checksum",
zap.String("database", table.Db.Name.L),
zap.String("table", table.Schema.Name.L),
zap.String("table", table.Info.Name.L),
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("calculated crc64", checksumResp.Checksum),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
for i := len(tables) - 1; i >= 0; i-- {
tables[i] = &utils.Table{
Db: dbSchema,
Schema: &model.TableInfo{
Info: &model.TableInfo{
ID: int64(i),
Name: model.NewCIStr("test" + strconv.Itoa(i)),
Columns: []*model.ColumnInfo{{
Expand Down
Loading