Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,18 @@ type Client struct {
workerPool *utils.WorkerPool
tlsConf *tls.Config

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
// TODO Remove this field or replace it with a []*DB,
// since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution.
// And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition.
// Which is dirty: why we need DBs from different sources?
// By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`,
// along with them in some private functions.
// Before you do it, you can firstly read discussions at
// https://github.com/pingcap/br/pull/377#discussion_r446594501,
// this probably isn't as easy and it seems like (however, not hard, too :D)
db *DB
rateLimit uint64
isOnline bool
Expand Down Expand Up @@ -337,7 +346,7 @@ func (rc *Client) CreateTables(
for i, t := range tables {
tbMapping[t.Info.Name.String()] = i
}
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh)
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh)
for et := range dataCh {
rules := et.RewriteRule
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
Expand All @@ -359,11 +368,24 @@ func (rc *Client) CreateTables(
return rewriteRules, newTables, nil
}

func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) {
func (rc *Client) createTable(
ctx context.Context,
db *DB,
dom *domain.Domain,
table *utils.Table,
newTS uint64,
) (CreatedTable, error) {
if db == nil {
db = rc.db
}

if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
err := rc.db.CreateTable(rc.ctx, table)
// don't use rc.ctx here...
// remove the ctx field of Client would be a great work,
// we just take a small step here :<
err := db.CreateTable(ctx, table)
if err != nil {
return CreatedTable{}, err
}
Expand All @@ -382,22 +404,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint
}

// GoCreateTables create tables, and generate their information.
// this function will use workers as the same number of sessionPool,
// leave sessionPool nil to send DDLs sequential.
func (rc *Client) GoCreateTables(
ctx context.Context,
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
dbPool []*DB,
errCh chan<- error,
) <-chan CreatedTable {
// Could we have a smaller size of tables?
outCh := make(chan CreatedTable, len(tables))
createOneTable := func(t *utils.Table) error {
createOneTable := func(db *DB, t *utils.Table) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rt, err := rc.createTable(dom, t, newTS)
rt, err := rc.createTable(ctx, db, dom, t, newTS)
if err != nil {
log.Error("create table failed",
zap.Error(err),
Expand All @@ -412,16 +437,46 @@ func (rc *Client) GoCreateTables(
outCh <- rt
return nil
}
startWork := func(t *utils.Table, done func()) {
defer done()
if err := createOneTable(nil, t); err != nil {
errCh <- err
return
}
}
if len(dbPool) > 0 {
workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers")
startWork = func(t *utils.Table, done func()) {
workers.ApplyWithID(func(id uint64) {
defer done()
selectedDB := int(id) % len(dbPool)
if err := createOneTable(dbPool[selectedDB], t); err != nil {
errCh <- err
return
}
})
}
}

go func() {
// TODO replace it with an errgroup
wg := new(sync.WaitGroup)
defer close(outCh)
defer log.Info("all tables created")
defer func() {
if len(dbPool) > 0 {
for _, db := range dbPool {
db.Close()
}
}
}()

for _, table := range tables {
if err := createOneTable(table); err != nil {
errCh <- err
return
}
tbl := table
wg.Add(1)
startWork(tbl, wg.Done)
}
wg.Wait()
}()
return outCh
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ func GetSSTMetaFromFile(
}
}

// MakeDBPool makes a session pool with specficated size by sessionFactory.
func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) {
dbPool := make([]*DB, 0, size)
for i := uint(0); i < size; i++ {
db, e := dbFactory()
if e != nil {
return dbPool, e
}
dbPool = append(dbPool, db)
}
return dbPool, nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backup.File) int {
result := 0
Expand Down
17 changes: 16 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
defaultDDLConcurrency = 16
)

var (
Expand Down Expand Up @@ -188,7 +189,21 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh)
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
dbPool, err := restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) {
return restore.NewDB(g, mgr.GetTiKV())
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(dbPool)),
)
}
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh)
if len(files) == 0 {
log.Info("no files, empty databases and tables are restored")
summary.SetSuccessStatus(true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Worker struct {
}

type taskFunc func()
type identifiedTaskFunc func(id uint64)
type identifiedTaskFunc func(uint64)

// NewWorkerPool returns a WorkPool.
func NewWorkerPool(limit uint, name string) *WorkerPool {
Expand Down