diff --git a/go.mod b/go.mod index cb25966fe..9bbce185d 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( go.uber.org/multierr v1.5.0 go.uber.org/zap v1.15.0 golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e google.golang.org/api v0.15.1 google.golang.org/grpc v1.26.0 ) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 127df708a..44e982b21 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" @@ -65,9 +66,8 @@ type Client struct { mgr ClientMgr clusterID uint64 - backupMeta kvproto.BackupMeta - storage storage.ExternalStorage - backend *kvproto.StorageBackend + storage storage.ExternalStorage + backend *kvproto.StorageBackend gcTTL int64 } @@ -165,22 +165,35 @@ func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBacken return nil } -// SaveBackupMeta saves the current backup meta at the given path. -func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error { - ddlJobsData, err := json.Marshal(ddlJobs) +// BuildBackupMeta constructs the backup meta file from its components. +func BuildBackupMeta( + req *kvproto.BackupRequest, + files []*kvproto.File, + rawRanges []*kvproto.RawRange, + ddlJobs []*model.Job, +) (backupMeta kvproto.BackupMeta, err error) { + backupMeta.StartVersion = req.StartVersion + backupMeta.EndVersion = req.EndVersion + backupMeta.IsRawKv = req.IsRawKv + backupMeta.RawRanges = rawRanges + backupMeta.Files = files + backupMeta.Ddls, err = json.Marshal(ddlJobs) if err != nil { - return errors.Trace(err) + err = errors.Trace(err) + return } + return +} - bc.backupMeta.Ddls = ddlJobsData - backupMetaData, err := proto.Marshal(&bc.backupMeta) +// SaveBackupMeta saves the current backup meta at the given path. +func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *kvproto.BackupMeta) error { + backupMetaData, err := proto.Marshal(backupMeta) if err != nil { return errors.Trace(err) } - log.Debug("backup meta", - zap.Reflect("meta", bc.backupMeta)) + log.Debug("backup meta", zap.Reflect("meta", backupMeta)) backendURL := storage.FormatBackendURL(bc.backend) - log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs))) + log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("size", len(backupMetaData))) return bc.storage.Write(ctx, utils.MetaFile, backupMetaData) } @@ -380,25 +393,44 @@ func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, req kvproto.BackupRequest, + concurrency uint, updateCh glue.Progress, -) error { - start := time.Now() - defer func() { - elapsed := time.Since(start) - log.Info("Backup Ranges", zap.Duration("take", elapsed)) - }() - +) ([]*kvproto.File, error) { errCh := make(chan error) ctx, cancel := context.WithCancel(ctx) defer cancel() + + // we collect all files in a single goroutine to avoid thread safety issues. + filesCh := make(chan []*kvproto.File, concurrency) + allFiles := make([]*kvproto.File, 0, len(ranges)) go func() { + init := time.Now() + start, cur := init, init + for files := range filesCh { + cur, start = start, time.Now() + allFiles = append(allFiles, files...) + summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start)) + } + log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init))) + }() + + go func() { + defer close(filesCh) + workerPool := utils.NewWorkerPool(concurrency, "Ranges") + eg, ectx := errgroup.WithContext(ctx) for _, r := range ranges { - err := bc.BackupRange( - ctx, r.StartKey, r.EndKey, req, updateCh) - if err != nil { - errCh <- err - return - } + sk, ek := r.StartKey, r.EndKey + workerPool.ApplyOnErrorGroup(eg, func() error { + files, err := bc.BackupRange(ectx, sk, ek, req, updateCh) + if err == nil { + filesCh <- files + } + return err + }) + } + if err := eg.Wait(); err != nil { + errCh <- err + return } close(errCh) }() @@ -421,16 +453,16 @@ func (bc *Client) BackupRanges( err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS) if err != nil { log.Error("update GC safePoint with TTL failed", zap.Error(err)) - return err + return nil, err } err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) if err != nil { log.Error("check GC safePoint failed", zap.Error(err)) - return err + return nil, err } if finished { // Return error (if there is any) before finishing backup. - return err + return allFiles, err } select { case err, ok := <-errCh: @@ -440,7 +472,7 @@ func (bc *Client) BackupRanges( finished = true } if err != nil { - return err + return nil, err } case <-t.C: } @@ -448,12 +480,13 @@ func (bc *Client) BackupRanges( } // BackupRange make a backup of the given key range. +// Returns an array of files backed up. func (bc *Client) BackupRange( ctx context.Context, startKey, endKey []byte, req kvproto.BackupRequest, updateCh glue.Progress, -) (err error) { +) (files []*kvproto.File, err error) { start := time.Now() defer func() { elapsed := time.Since(start) @@ -461,8 +494,6 @@ func (bc *Client) BackupRange( key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey) if err != nil { summary.CollectFailureUnit(key, err) - } else { - summary.CollectSuccessUnit(key, 1, elapsed) } }() log.Info("backup started", @@ -476,7 +507,7 @@ func (bc *Client) BackupRange( var allStores []*metapb.Store allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } req.ClusterId = bc.clusterID @@ -489,7 +520,7 @@ func (bc *Client) BackupRange( var results rtree.RangeTree results, err = push.pushBackup(req, allStores, updateCh) if err != nil { - return err + return nil, err } log.Info("finish backup push down", zap.Int("Ok", results.Len())) @@ -499,15 +530,10 @@ func (bc *Client) BackupRange( ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh) if err != nil { - return err + return nil, err } - bc.backupMeta.StartVersion = req.StartVersion - bc.backupMeta.EndVersion = req.EndVersion - bc.backupMeta.IsRawKv = req.IsRawKv if req.IsRawKv { - bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges, - &kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf}) log.Info("backup raw ranges", zap.Stringer("startKey", utils.WrapKey(startKey)), zap.Stringer("endKey", utils.WrapKey(endKey)), @@ -520,14 +546,15 @@ func (bc *Client) BackupRange( results.Ascend(func(i btree.Item) bool { r := i.(*rtree.Range) - bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...) + files = append(files, r.Files...) return true }) // Check if there are duplicated files. checkDupFiles(&results) + collectFileInfo(files) - return nil + return files, nil } func (bc *Client) findRegionLeader( @@ -819,12 +846,12 @@ func SendBackup( } // ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV. -func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) { - if len(local) != len(bc.backupMeta.Schemas) { +func ChecksumMatches(backupMeta *kvproto.BackupMeta, local []Checksum) (bool, error) { + if len(local) != len(backupMeta.Schemas) { return false, nil } - for i, schema := range bc.backupMeta.Schemas { + for i, schema := range backupMeta.Schemas { localChecksum := local[i] dbInfo := &model.DBInfo{} err := json.Unmarshal(schema.Db, dbInfo) @@ -860,14 +887,9 @@ func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) { return true, nil } -// ArchiveSize returns the total size of the archive (before encryption). -func (bc *Client) ArchiveSize() uint64 { - return utils.ArchiveSize(&bc.backupMeta) -} - -// CollectFileInfo collects ungrouped file summary information, like kv count and size. -func (bc *Client) CollectFileInfo() { - for _, file := range bc.backupMeta.Files { +// collectFileInfo collects ungrouped file summary information, like kv count and size. +func collectFileInfo(files []*kvproto.File) { + for _, file := range files { summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) } @@ -875,20 +897,20 @@ func (bc *Client) CollectFileInfo() { // CollectChecksums check data integrity by xor all(sst_checksum) per table // it returns the checksum of all local files. -func (bc *Client) CollectChecksums() ([]Checksum, error) { +func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) { start := time.Now() defer func() { elapsed := time.Since(start) summary.CollectDuration("backup fast checksum", elapsed) }() - dbs, err := utils.LoadBackupTables(&bc.backupMeta) + dbs, err := utils.LoadBackupTables(backupMeta) if err != nil { return nil, err } - checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas)) - for _, schema := range bc.backupMeta.Schemas { + checksums := make([]Checksum, 0, len(backupMeta.Schemas)) + for _, schema := range backupMeta.Schemas { dbInfo := &model.DBInfo{} err = json.Unmarshal(schema.Db, dbInfo) if err != nil { @@ -924,37 +946,16 @@ func (bc *Client) CollectChecksums() ([]Checksum, error) { return checksums, nil } -// CompleteMeta wait response of admin checksum from TiDB to complete backup meta. -func (bc *Client) CompleteMeta(backupSchemas *Schemas) error { - schemas, err := backupSchemas.FinishTableChecksum() - if err != nil { - return err - } - bc.backupMeta.Schemas = schemas - return nil -} - -// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum. -// use this when user skip the checksum generating. -func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) { - schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas)) - for _, v := range backupSchemas.schemas { - schema := v - schemas = append(schemas, &schema) - } - bc.backupMeta.Schemas = schemas -} - -// FilterSchema filter schema that doesn't have backup files +// FilterSchema filter in-place schemas that doesn't have backup files // this is useful during incremental backup, no files in backup means no files to restore // so we can skip some DDL in restore to speed up restoration. -func (bc *Client) FilterSchema() error { - dbs, err := utils.LoadBackupTables(&bc.backupMeta) +func FilterSchema(backupMeta *kvproto.BackupMeta) error { + dbs, err := utils.LoadBackupTables(backupMeta) if err != nil { return err } - schemas := make([]*kvproto.Schema, 0, len(bc.backupMeta.Schemas)) - for _, schema := range bc.backupMeta.Schemas { + schemas := make([]*kvproto.Schema, 0, len(backupMeta.Schemas)) + for _, schema := range backupMeta.Schemas { dbInfo := &model.DBInfo{} err := json.Unmarshal(schema.Db, dbInfo) if err != nil { @@ -970,6 +971,6 @@ func (bc *Client) FilterSchema() error { schemas = append(schemas, schema) } } - bc.backupMeta.Schemas = schemas + backupMeta.Schemas = schemas return nil } diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index edcd2c293..1f8ac8c12 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -126,6 +126,17 @@ func (pending *Schemas) FinishTableChecksum() ([]*backup.Schema, error) { } } +// CopyMeta copies schema metadata directly from pending backupSchemas, without calculating checksum. +// use this when user skip the checksum generating. +func (pending *Schemas) CopyMeta() []*backup.Schema { + schemas := make([]*backup.Schema, 0, len(pending.schemas)) + for _, v := range pending.schemas { + schema := v + schemas = append(schemas, &schema) + } + return schemas +} + // Len returns the number of schemas. func (pending *Schemas) Len() int { return len(pending.schemas) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 4d7f6543a..6ac8c74b4 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -139,6 +139,13 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig isIncrementalBackup := cfg.LastBackupTS > 0 + req := kvproto.BackupRequest{ + StartVersion: cfg.LastBackupTS, + EndVersion: backupTS, + RateLimit: cfg.RateLimit, + Concurrency: defaultBackupConcurrency, + } + ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( mgr.GetDomain(), mgr.GetTiKV(), cfg.TableFilter, backupTS) if err != nil { @@ -146,7 +153,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } // nothing to backup if ranges == nil { - return client.SaveBackupMeta(ctx, nil) + backupMeta, err2 := backup.BuildBackupMeta(&req, nil, nil, nil) + if err2 != nil { + return err2 + } + return client.SaveBackupMeta(ctx, &backupMeta) } ddlJobs := make([]*model.Job, 0) @@ -184,20 +195,18 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - req := kvproto.BackupRequest{ - StartVersion: cfg.LastBackupTS, - EndVersion: backupTS, - RateLimit: cfg.RateLimit, - Concurrency: cfg.Concurrency, - } - err = client.BackupRanges( - ctx, ranges, req, updateCh) + files, err := client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), updateCh) if err != nil { return err } // Backup has finished updateCh.Close() + backupMeta, err := backup.BuildBackupMeta(&req, files, nil, ddlJobs) + if err != nil { + return err + } + // Checksum from server, and then fulfill the backup metadata. if cfg.Checksum && !isIncrementalBackup { backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len()) @@ -205,26 +214,24 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) backupSchemas.Start( ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - err = client.CompleteMeta(backupSchemas) + backupMeta.Schemas, err = backupSchemas.FinishTableChecksum() if err != nil { return err } // Checksum has finished updateCh.Close() // collect file information. - err = checkChecksums(client) + err = checkChecksums(&backupMeta) if err != nil { return err } } else { // Just... copy schemas from origin. - client.CopyMetaFrom(backupSchemas) - // Anyway, let's collect file info for summary. - client.CollectFileInfo() + backupMeta.Schemas = backupSchemas.CopyMeta() if isIncrementalBackup { // Since we don't support checksum for incremental data, fast checksum should be skipped. log.Info("Skip fast checksum in incremental backup") - err = client.FilterSchema() + err = backup.FilterSchema(&backupMeta) if err != nil { return err } @@ -234,12 +241,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } } - err = client.SaveBackupMeta(ctx, ddlJobs) + err = client.SaveBackupMeta(ctx, &backupMeta) if err != nil { return err } - g.Record("Size", client.ArchiveSize()) + g.Record("Size", utils.ArchiveSize(&backupMeta)) // Set task summary to success status. summary.SetSuccessStatus(true) @@ -248,13 +255,13 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // checkChecksums checks the checksum of the client, once failed, // returning a error with message: "mismatched checksum". -func checkChecksums(client *backup.Client) error { - checksums, err := client.CollectChecksums() +func checkChecksums(backupMeta *kvproto.BackupMeta) error { + checksums, err := backup.CollectChecksums(backupMeta) if err != nil { return err } var matches bool - matches, err = client.ChecksumMatches(checksums) + matches, err = backup.ChecksumMatches(backupMeta, checksums) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index aacd61627..dfc8d5f67 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -128,8 +128,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf IsRawKv: true, Cf: cfg.CF, } - - err = client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) + files, err := client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) if err != nil { return err } @@ -137,12 +136,17 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf updateCh.Close() // Checksum - err = client.SaveBackupMeta(ctx, nil) + rawRanges := []*kvproto.RawRange{{StartKey: backupRange.StartKey, EndKey: backupRange.EndKey, Cf: cfg.CF}} + backupMeta, err := backup.BuildBackupMeta(&req, files, rawRanges, nil) + if err != nil { + return err + } + err = client.SaveBackupMeta(ctx, &backupMeta) if err != nil { return err } - g.Record("Size", client.ArchiveSize()) + g.Record("Size", utils.ArchiveSize(&backupMeta)) // Set task summary to success status. summary.SetSuccessStatus(true) diff --git a/pkg/utils/worker.go b/pkg/utils/worker.go index a04f59886..67e9d0dcb 100644 --- a/pkg/utils/worker.go +++ b/pkg/utils/worker.go @@ -5,6 +5,7 @@ package utils import ( "github.com/pingcap/log" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // WorkerPool contains a pool of workers. @@ -37,11 +38,32 @@ func NewWorkerPool(limit uint, name string) *WorkerPool { // Apply executes a task. func (pool *WorkerPool) Apply(fn taskFunc) { - pool.ApplyWithID(func(_ uint64) { fn() }) + worker := pool.apply() + go func() { + defer pool.recycle(worker) + fn() + }() } // ApplyWithID execute a task and provides it with the worker ID. func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) { + worker := pool.apply() + go func() { + defer pool.recycle(worker) + fn(worker.ID) + }() +} + +// ApplyOnErrorGroup executes a task in an errorgroup. +func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error) { + worker := pool.apply() + eg.Go(func() error { + defer pool.recycle(worker) + return fn() + }) +} + +func (pool *WorkerPool) apply() *Worker { var worker *Worker select { case worker = <-pool.workers: @@ -49,10 +71,7 @@ func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) { log.Debug("wait for workers", zap.String("pool", pool.name)) worker = <-pool.workers } - go func() { - fn(worker.ID) - pool.recycle(worker) - }() + return worker } func (pool *WorkerPool) recycle(worker *Worker) {