From 4e82ed0482e070ba6cea17448fc4ae3698d8fe23 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 4 Aug 2020 17:20:34 +0800 Subject: [PATCH 1/4] restore, task: remove in-struct context Signed-off-by: Neil Shen --- pkg/restore/client.go | 58 ++++++++++++++++------------------- pkg/restore/client_test.go | 5 ++- pkg/restore/pipeline_items.go | 2 +- pkg/task/restore.go | 14 ++++----- pkg/task/restore_raw.go | 4 +-- 5 files changed, 39 insertions(+), 44 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index ee540720e..6e72259fe 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -49,7 +49,6 @@ const defaultChecksumConcurrency = 64 // Client sends requests to restore files. type Client struct { - ctx context.Context cancel context.CancelFunc pdClient pd.Client @@ -93,22 +92,17 @@ type Client struct { // NewRestoreClient returns a new RestoreClient. func NewRestoreClient( - ctx context.Context, g glue.Glue, pdClient pd.Client, store kv.Storage, tlsConf *tls.Config, ) (*Client, error) { - ctx, cancel := context.WithCancel(ctx) db, err := NewDB(g, store) if err != nil { - cancel() return nil, errors.Trace(err) } return &Client{ - ctx: ctx, - cancel: cancel, pdClient: pdClient, toolClient: NewSplitClient(pdClient, tlsConf), db: db, @@ -154,7 +148,6 @@ func (rc *Client) Close() { if rc.db != nil { rc.db.Close() } - rc.cancel() log.Info("Restore client closed") } @@ -267,11 +260,11 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) { } // ResetTS resets the timestamp of PD to a bigger value. -func (rc *Client) ResetTS(pdAddrs []string) error { +func (rc *Client) ResetTS(ctx context.Context, pdAddrs []string) error { restoreTS := rc.backupMeta.GetEndVersion() log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) i := 0 - return utils.WithRetry(rc.ctx, func() error { + return utils.WithRetry(ctx, func() error { idx := i % len(pdAddrs) i++ return utils.ResetTS(pdAddrs[idx], restoreTS, rc.tlsConf) @@ -279,10 +272,10 @@ func (rc *Client) ResetTS(pdAddrs []string) error { } // GetPlacementRules return the current placement rules. -func (rc *Client) GetPlacementRules(pdAddrs []string) ([]placement.Rule, error) { +func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]placement.Rule, error) { var placementRules []placement.Rule i := 0 - errRetry := utils.WithRetry(rc.ctx, func() error { + errRetry := utils.WithRetry(ctx, func() error { var err error idx := i % len(pdAddrs) i++ @@ -326,12 +319,12 @@ func (rc *Client) GetTableSchema( } // CreateDatabase creates a database. -func (rc *Client) CreateDatabase(db *model.DBInfo) error { +func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error { if rc.IsSkipCreateSQL() { log.Info("skip create database", zap.Stringer("database", db.Name)) return nil } - return rc.db.CreateDatabase(rc.ctx, db) + return rc.db.CreateDatabase(ctx, db) } // CreateTables creates multiple tables, and returns their rewrite rules. @@ -507,14 +500,14 @@ func makeTiFlashOfTableRecord(table *utils.Table, replica int) (*backup.Schema, // returns the removed count of TiFlash nodes. // TODO: save the removed TiFlash information into disk. // TODO: remove this after tiflash supports restore. -func (rc *Client) RemoveTiFlashOfTable(table CreatedTable, rule []placement.Rule) (int, error) { +func (rc *Client) RemoveTiFlashOfTable(ctx context.Context, table CreatedTable, rule []placement.Rule) (int, error) { if rule := utils.SearchPlacementRule(table.Table.ID, rule, placement.Learner); rule != nil { if rule.Count > 0 { log.Info("remove TiFlash of table", zap.Int64("table ID", table.Table.ID), zap.Int("count", rule.Count)) err := multierr.Combine( - rc.db.AlterTiflashReplica(rc.ctx, table.OldTable, 0), + rc.db.AlterTiflashReplica(ctx, table.OldTable, 0), rc.removeTiFlashOf(table.OldTable, rule.Count), - rc.flushTiFlashRecord(), + rc.flushTiFlashRecord(ctx), ) if err != nil { return 0, errors.Trace(err) @@ -535,7 +528,7 @@ func (rc *Client) removeTiFlashOf(table *utils.Table, replica int) error { return nil } -func (rc *Client) flushTiFlashRecord() error { +func (rc *Client) flushTiFlashRecord(ctx context.Context) error { // Today nothing to do :D if !rc.tiFlashRecordUpdated { return nil @@ -551,7 +544,7 @@ func (rc *Client) flushTiFlashRecord() error { } backendURL := storage.FormatBackendURL(rc.backend) log.Info("update backup meta", zap.Stringer("path", &backendURL)) - err = rc.storage.Write(rc.ctx, utils.SavedMetaFile, backupMetaData) + err = rc.storage.Write(ctx, utils.SavedMetaFile, backupMetaData) if err != nil { return errors.Trace(err) } @@ -560,9 +553,9 @@ func (rc *Client) flushTiFlashRecord() error { // RecoverTiFlashOfTable recovers TiFlash replica of some table. // TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error { +func (rc *Client) RecoverTiFlashOfTable(ctx context.Context, table *utils.Table) error { if table.TiFlashReplicas > 0 { - err := rc.db.AlterTiflashReplica(rc.ctx, table, table.TiFlashReplicas) + err := rc.db.AlterTiflashReplica(ctx, table, table.TiFlashReplicas) if err != nil { return errors.Trace(err) } @@ -572,9 +565,9 @@ func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error { // RecoverTiFlashReplica recovers all the tiflash replicas of a table // TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error { +func (rc *Client) RecoverTiFlashReplica(ctx context.Context, tables []*utils.Table) error { for _, table := range tables { - if err := rc.RecoverTiFlashOfTable(table); err != nil { + if err := rc.RecoverTiFlashOfTable(ctx, table); err != nil { return err } } @@ -582,14 +575,14 @@ func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error { } // ExecDDLs executes the queries of the ddl jobs. -func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { +func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error { // Sort the ddl jobs by schema version in ascending order. sort.Slice(ddlJobs, func(i, j int) bool { return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion }) for _, job := range ddlJobs { - err := rc.db.ExecDDL(rc.ctx, job) + err := rc.db.ExecDDL(ctx, job) if err != nil { return errors.Trace(err) } @@ -601,14 +594,14 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { return nil } -func (rc *Client) setSpeedLimit() error { +func (rc *Client) setSpeedLimit(ctx context.Context) error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { - stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.SkipTiFlash) + stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash) if err != nil { return err } for _, store := range stores { - err = rc.fileImporter.setDownloadSpeedLimit(rc.ctx, store.GetId()) + err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId()) if err != nil { return err } @@ -620,6 +613,7 @@ func (rc *Client) setSpeedLimit() error { // RestoreFiles tries to restore the files. func (rc *Client) RestoreFiles( + ctx context.Context, files []*backup.File, rewriteRules *RewriteRules, rejectStoreMap map[uint64]bool, @@ -638,8 +632,8 @@ func (rc *Client) RestoreFiles( log.Debug("start to restore files", zap.Int("files", len(files)), ) - eg, ectx := errgroup.WithContext(rc.ctx) - err = rc.setSpeedLimit() + eg, ectx := errgroup.WithContext(ctx) + err = rc.setSpeedLimit(ctx) if err != nil { return err } @@ -664,7 +658,9 @@ func (rc *Client) RestoreFiles( } // RestoreRaw tries to restore raw keys in the specified range. -func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error { +func (rc *Client) RestoreRaw( + ctx context.Context, startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress, +) error { start := time.Now() defer func() { elapsed := time.Since(start) @@ -674,7 +670,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil zap.Duration("take", elapsed)) }() errCh := make(chan error, len(files)) - eg, ectx := errgroup.WithContext(rc.ctx) + eg, ectx := errgroup.WithContext(ctx) defer close(errCh) err := rc.fileImporter.SetRawRange(startKey, endKey) diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index d61a74b46..1bac5843d 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -3,7 +3,6 @@ package restore_test import ( - "context" "math" "strconv" @@ -40,7 +39,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxInt64) @@ -98,7 +97,7 @@ func (s *testRestoreClientSuite) TestIsOnline(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) c.Assert(client.IsOnline(), IsFalse) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index d99f5bd8e..1f835c40e 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -181,7 +181,7 @@ func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rew files = append(files, fs.Files...) } - if err := b.client.RestoreFiles(files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil { + if err := b.client.RestoreFiles(ctx, files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil { return err } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a8402cde2..001ab7599 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -103,7 +103,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() - client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) + client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) if err != nil { return err } @@ -161,7 +161,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf enableTiDBConfig() // execute DDL first - err = client.ExecDDLs(ddlJobs) + err = client.ExecDDLs(ctx, ddlJobs) if err != nil { return errors.Trace(err) } @@ -175,7 +175,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } for _, db := range dbs { - err = client.CreateDatabase(db.Info) + err = client.CreateDatabase(ctx, db.Info) if err != nil { return err } @@ -224,7 +224,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. if !client.IsIncremental() { - if err = client.ResetTS(cfg.PD); err != nil { + if err = client.ResetTS(ctx, cfg.PD); err != nil { log.Error("reset pd TS failed", zap.Error(err)) return err } @@ -449,7 +449,7 @@ func restoreTableStream( log.Info("doing postwork", zap.Int("table count", len(oldTables)), ) - if err := client.RecoverTiFlashReplica(oldTables); err != nil { + if err := client.RecoverTiFlashReplica(ctx, oldTables); err != nil { log.Error("failed on recover TiFlash replicas", zap.Error(err)) errCh <- err } @@ -465,14 +465,14 @@ func restoreTableStream( return } if removeTiFlashReplica { - rules, err := client.GetPlacementRules(pdAddr) + rules, err := client.GetPlacementRules(ctx, pdAddr) if err != nil { errCh <- err return } log.Debug("get rules", zap.Any("rules", rules), zap.Strings("pd", pdAddr)) log.Debug("try to remove tiflash of table", zap.Stringer("table name", t.Table.Name)) - tiFlashRep, err := client.RemoveTiFlashOfTable(t.CreatedTable, rules) + tiFlashRep, err := client.RemoveTiFlashOfTable(ctx, t.CreatedTable, rules) if err != nil { log.Error("failed on remove TiFlash replicas", zap.Error(err)) errCh <- err diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 4bd867086..1ca22eb45 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -57,7 +57,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer mgr.Close() - client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) + client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) if err != nil { return err } @@ -117,7 +117,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer restorePostWork(ctx, client, restoreSchedulers) - err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) + err = client.RestoreRaw(ctx, cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { return errors.Trace(err) } From 8b1aeeaf1009ad09cf664f8857f3bbcaf42800c7 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 4 Aug 2020 17:43:27 +0800 Subject: [PATCH 2/4] backup: remove in-struct context Signed-off-by: Neil Shen --- pkg/backup/client.go | 8 +------- pkg/backup/push.go | 8 +++----- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 718451e22..920c77457 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -400,8 +400,6 @@ func (bc *Client) BackupRanges( updateCh glue.Progress, ) ([]*kvproto.File, error) { errCh := make(chan error) - ctx, cancel := context.WithCancel(ctx) - defer cancel() // we collect all files in a single goroutine to avoid thread safety issues. filesCh := make(chan []*kvproto.File, concurrency) @@ -481,8 +479,6 @@ func (bc *Client) BackupRange( zap.Stringer("EndKey", utils.WrapKey(endKey)), zap.Uint64("RateLimit", req.RateLimit), zap.Uint32("Concurrency", req.Concurrency)) - ctx, cancel := context.WithCancel(ctx) - defer cancel() var allStores []*metapb.Store allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash) @@ -498,7 +494,7 @@ func (bc *Client) BackupRange( push := newPushDown(ctx, bc.mgr, len(allStores)) var results rtree.RangeTree - results, err = push.pushBackup(req, allStores, updateCh) + results, err = push.pushBackup(ctx, req, allStores, updateCh) if err != nil { return nil, err } @@ -799,8 +795,6 @@ func SendBackup( zap.Stringer("EndKey", utils.WrapKey(req.EndKey)), zap.Uint64("storeID", storeID), ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() bcli, err := client.Backup(ctx, &req) if err != nil { log.Error("fail to backup", zap.Uint64("StoreID", storeID)) diff --git a/pkg/backup/push.go b/pkg/backup/push.go index b94464c62..6290c03f2 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -18,7 +18,6 @@ import ( // pushDown warps a backup task. type pushDown struct { - ctx context.Context mgr ClientMgr respCh chan *backup.BackupResponse errCh chan error @@ -26,9 +25,7 @@ type pushDown struct { // newPushDown creates a push down backup. func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { - log.Info("new backup client") return &pushDown{ - ctx: ctx, mgr: mgr, respCh: make(chan *backup.BackupResponse, cap), errCh: make(chan error, cap), @@ -37,6 +34,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { // FullBackup make a full backup of a tikv cluster. func (push *pushDown) pushBackup( + ctx context.Context, req backup.BackupRequest, stores []*metapb.Store, updateCh glue.Progress, @@ -50,7 +48,7 @@ func (push *pushDown) pushBackup( log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState())) continue } - client, err := push.mgr.GetBackupClient(push.ctx, storeID) + client, err := push.mgr.GetBackupClient(ctx, storeID) if err != nil { log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) return res, errors.Trace(err) @@ -59,7 +57,7 @@ func (push *pushDown) pushBackup( go func() { defer wg.Done() err := SendBackup( - push.ctx, storeID, client, req, + ctx, storeID, client, req, func(resp *backup.BackupResponse) error { // Forward all responses (including error). push.respCh <- resp From 605f3c5cbddb03b3405ba5b7e98d6de02f75411f Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 4 Aug 2020 17:44:02 +0800 Subject: [PATCH 3/4] remove dead code Signed-off-by: Neil Shen --- pkg/backup/client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 920c77457..c5c65132d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -439,10 +439,6 @@ func (bc *Client) BackupRanges( close(errCh) }() - // Check GC safepoint every 5s. - t := time.NewTicker(time.Second * 5) - defer t.Stop() - for err := range errCh { if err != nil { return nil, err From 54c786b4c568fdc16a9a24b81ec23396c8d3b010 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 10 Aug 2020 20:11:49 +0800 Subject: [PATCH 4/4] address warnings Signed-off-by: Neil Shen --- pkg/backup/client.go | 2 +- pkg/backup/push.go | 2 +- pkg/restore/client.go | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index c5c65132d..a39914455 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -487,7 +487,7 @@ func (bc *Client) BackupRange( req.EndKey = endKey req.StorageBackend = bc.backend - push := newPushDown(ctx, bc.mgr, len(allStores)) + push := newPushDown(bc.mgr, len(allStores)) var results rtree.RangeTree results, err = push.pushBackup(ctx, req, allStores, updateCh) diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 6290c03f2..fd1903ab9 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -24,7 +24,7 @@ type pushDown struct { } // newPushDown creates a push down backup. -func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { +func newPushDown(mgr ClientMgr, cap int) *pushDown { return &pushDown{ mgr: mgr, respCh: make(chan *backup.BackupResponse, cap), diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 6e72259fe..d3121556a 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -49,8 +49,6 @@ const defaultChecksumConcurrency = 64 // Client sends requests to restore files. type Client struct { - cancel context.CancelFunc - pdClient pd.Client toolClient SplitClient fileImporter FileImporter