Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -309,7 +310,7 @@ func (bc *Client) BackupRanges(
ctx context.Context,
ranges []rtree.Range,
req kvproto.BackupRequest,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -374,7 +375,7 @@ func (bc *Client) BackupRange(
ctx context.Context,
startKey, endKey []byte,
req kvproto.BackupRequest,
updateCh chan<- struct{},
updateCh glue.Progress,
) (err error) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -486,7 +487,7 @@ func (bc *Client) fineGrainedBackup(
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
for {
Expand Down Expand Up @@ -561,7 +562,7 @@ func (bc *Client) fineGrainedBackup(
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

// Update progress
updateCh <- struct{}{}
updateCh.Inc()
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
)

Expand All @@ -38,7 +39,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown {
func (push *pushDown) pushBackup(
req backup.BackupRequest,
stores []*metapb.Store,
updateCh chan<- struct{},
updateCh glue.Progress,
) (rtree.RangeTree, error) {
// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (push *pushDown) pushBackup(
resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())

// Update progress
updateCh <- struct{}{}
updateCh.Inc()
} else {
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func (pending *Schemas) Start(
store kv.Storage,
backupTS uint64,
concurrency uint,
updateCh chan<- struct{},
updateCh glue.Progress,
) {
workerPool := utils.NewWorkerPool(concurrency, "Schemas")
go func() {
Expand All @@ -82,7 +83,7 @@ func (pending *Schemas) Start(

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh <- struct{}{}
updateCh.Inc()
return
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (pending *Schemas) Start(
zap.Duration("take", time.Since(start)))
pending.backupSchemaCh <- schema

updateCh <- struct{}{}
updateCh.Inc()
})
}
pending.wg.Wait()
Expand Down
27 changes: 23 additions & 4 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package backup
import (
"context"
"math"
"sync/atomic"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand All @@ -30,6 +31,24 @@ func (s *testBackupSchemaSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

type simpleProgress struct {
counter int64
}

func (sp *simpleProgress) Inc() {
atomic.AddInt64(&sp.counter, 1)
}

func (sp *simpleProgress) Close() {}

func (sp *simpleProgress) reset() {
atomic.StoreInt64(&sp.counter, 0)
}

func (sp *simpleProgress) get() int64 {
return atomic.LoadInt64(&sp.counter)
}

func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()
Expand Down Expand Up @@ -73,10 +92,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 1)
updateCh := make(chan struct{}, 2)
updateCh := new(simpleProgress)
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh)
schemas, err := backupSchemas.finishTableChecksum()
<-updateCh
c.Assert(updateCh.get(), Equals, int64(1))
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 1)
// Cluster returns a dummy checksum (all fields are 1).
Expand All @@ -93,10 +112,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 2)
updateCh.reset()
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 2, updateCh)
schemas, err = backupSchemas.finishTableChecksum()
<-updateCh
<-updateCh
c.Assert(updateCh.get(), Equals, int64(2))
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 2)
// Cluster returns a dummy checksum (all fields are 1).
Expand Down
12 changes: 12 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Glue interface {
// OwnsStorage returns whether the storage returned by Open() is owned
// If this method returns false, the connection manager will never close the storage.
OwnsStorage() bool

StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) Progress
}

// Session is an abstraction of the session.Session interface.
Expand All @@ -30,3 +32,13 @@ type Session interface {
ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error)
Close()
}

// Progress is an interface recording the current execution progress.
type Progress interface {
// Inc increases the progress. This method must be goroutine-safe, and can
// be called from any goroutine.
Inc()
// Close marks the progress as 100% complete and that Inc() can no longer be
// called.
Close()
}
5 changes: 5 additions & 0 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (Glue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue
func (g Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return g.tikvGlue.StartProgress(ctx, cmdName, total, redirectLog)
}

// Execute implements glue.Session
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
Expand Down
22 changes: 22 additions & 0 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
package gluetikv

import (
"context"

pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/utils"
)

// Glue is an implementation of glue.Glue that accesses only TiKV without TiDB.
Expand Down Expand Up @@ -41,3 +44,22 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
func (Glue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)}
}

type progress struct {
ch chan<- struct{}
}

// Inc implements glue.Progress
func (p progress) Inc() {
p.ch <- struct{}{}
}

// Close implements glue.Progress
func (p progress) Close() {
close(p.ch)
}
12 changes: 6 additions & 6 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
updateCh glue.Progress,
) (err error) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (rc *Client) RestoreFiles(
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand All @@ -499,7 +499,7 @@ func (rc *Client) RestoreFiles(
}

// RestoreRaw tries to restore raw keys in the specified range.
func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error {
func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand Down Expand Up @@ -529,7 +529,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (rc *Client) ValidateChecksum(
kvClient kv.Client,
tables []*utils.Table,
newTables []*model.TableInfo,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -674,7 +674,7 @@ func (rc *Client) ValidateChecksum(
return
}

updateCh <- struct{}{}
updateCh.Inc()
})
}
wg.Wait()
Expand Down
8 changes: 6 additions & 2 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
)
Expand Down Expand Up @@ -309,6 +310,9 @@ func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
}

func truncateTS(key []byte) []byte {
if len(key) == 0 {
Comment thread
3pointer marked this conversation as resolved.
return nil
}
return key[:len(key)-8]
}

Expand All @@ -320,7 +324,7 @@ func SplitRanges(
client *Client,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand All @@ -339,7 +343,7 @@ func SplitRanges(

return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)

const (
Expand Down Expand Up @@ -160,7 +159,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig

// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
Expand All @@ -175,14 +174,14 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}
// Backup has finished
close(updateCh)
updateCh.Close()

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.SetSkipChecksum(!cfg.Checksum)
backupSchemas.Start(
Expand All @@ -209,7 +208,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
log.Info("Skip fast checksum in incremental backup")
}
// Checksum has finished
close(updateCh)
updateCh.Close()

err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf

// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
Expand All @@ -134,7 +134,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return err
}
// Backup has finished
close(updateCh)
updateCh.Close()

// Checksum
err = client.SaveBackupMeta(ctx, nil)
Expand Down
Loading