From c809a6fb1e8318bb606df787e0eb002cc001ee0d Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 9 Jul 2020 14:35:40 +0800 Subject: [PATCH 1/5] *: replace some of wait group into errgroup --- pkg/backup/schema.go | 23 +++--- pkg/restore/client.go | 182 ++++++++++++++++++------------------------ pkg/restore/import.go | 41 +++++----- pkg/utils/worker.go | 9 +++ 4 files changed, 115 insertions(+), 140 deletions(-) diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 1f8ac8c12..ef603d81c 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/pingcap/errors" @@ -16,6 +15,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/pingcap/br/pkg/checksum" "github.com/pingcap/br/pkg/glue" @@ -35,7 +35,6 @@ type Schemas struct { schemas map[string]backup.Schema backupSchemaCh chan backup.Schema errCh chan error - wg *sync.WaitGroup } func newBackupSchemas() *Schemas { @@ -43,7 +42,6 @@ func newBackupSchemas() *Schemas { schemas: make(map[string]backup.Schema), backupSchemaCh: make(chan backup.Schema), errCh: make(chan error), - wg: new(sync.WaitGroup), } } @@ -65,28 +63,24 @@ func (pending *Schemas) Start( updateCh glue.Progress, ) { workerPool := utils.NewWorkerPool(concurrency, "Schemas") + errg, ectx := errgroup.WithContext(ctx) go func() { startAll := time.Now() for n, s := range pending.schemas { log.Info("table checksum start", zap.String("table", n)) name := n schema := s - pending.wg.Add(1) - workerPool.Apply(func() { - defer pending.wg.Done() - + workerPool.ApplyOnErrorGroup(errg, func() error { start := time.Now() table := model.TableInfo{} err := json.Unmarshal(schema.Table, &table) if err != nil { - pending.errCh <- err - return + return err } checksumResp, err := calculateChecksum( - ctx, &table, store.GetClient(), backupTS) + ectx, &table, store.GetClient(), backupTS) if err != nil { - pending.errCh <- err - return + return err } schema.Crc64Xor = checksumResp.Checksum schema.TotalKvs = checksumResp.TotalKvs @@ -100,9 +94,12 @@ func (pending *Schemas) Start( pending.backupSchemaCh <- schema updateCh.Inc() + return nil }) } - pending.wg.Wait() + if err := errg.Wait(); err != nil { + pending.errCh <- err + } close(pending.backupSchemaCh) log.Info("backup checksum", zap.Duration("take", time.Since(startAll))) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index d5e5d073d..9b4f63516 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -11,7 +11,6 @@ import ( "fmt" "sort" "strconv" - "sync" "time" "github.com/gogo/protobuf/proto" @@ -30,6 +29,7 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" @@ -171,7 +171,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. metaClient := NewSplitClient(rc.pdClient, rc.tlsConf) importClient := NewImportClient(metaClient, rc.tlsConf) - rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, backupMeta.IsRawKv, rc.rateLimit) + rc.fileImporter = NewFileImporter(metaClient, importClient, backend, backupMeta.IsRawKv, rc.rateLimit) return nil } @@ -371,10 +371,6 @@ func (rc *Client) createTable( table *utils.Table, newTS uint64, ) (CreatedTable, error) { - if db == nil { - db = rc.db - } - if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { @@ -411,14 +407,15 @@ func (rc *Client) GoCreateTables( errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? + log.Info("start create tables") outCh := make(chan CreatedTable, len(tables)) - createOneTable := func(db *DB, t *utils.Table) error { + createOneTable := func(c context.Context, db *DB, t *utils.Table) error { select { - case <-ctx.Done(): - return ctx.Err() + case <-c.Done(): + return c.Err() default: } - rt, err := rc.createTable(ctx, db, dom, t, newTS) + rt, err := rc.createTable(c, db, dom, t, newTS) if err != nil { log.Error("create table failed", zap.Error(err), @@ -433,50 +430,48 @@ func (rc *Client) GoCreateTables( outCh <- rt return nil } - startWork := func(t *utils.Table, done func()) { - defer done() - if err := createOneTable(nil, t); err != nil { - errCh <- err - return - } - } - if len(dbPool) > 0 { - workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") - startWork = func(t *utils.Table, done func()) { - workers.ApplyWithID(func(id uint64) { - defer done() - selectedDB := int(id) % len(dbPool) - if err := createOneTable(dbPool[selectedDB], t); err != nil { - errCh <- err - return - } - }) - } - } - go func() { - // TODO replace it with an errgroup - wg := new(sync.WaitGroup) defer close(outCh) - defer log.Info("all tables created") - defer func() { - if len(dbPool) > 0 { - for _, db := range dbPool { - db.Close() - } - } - }() - - for _, table := range tables { - tbl := table - wg.Add(1) - startWork(tbl, wg.Done) + defer log.Debug("all tables are created") + var err error + if len(dbPool) > 0 { + err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) + } else { + err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) + } + if err != nil { + errCh <- err } - wg.Wait() }() return outCh } +func (rc *Client) createTablesWithSoleDB(ctx context.Context, + createOneTable func(ctx context.Context, db *DB, t *utils.Table) error, + tables []*utils.Table) error { + for _, t := range tables { + if err := createOneTable(ctx, rc.db, t); err != nil { + return err + } + } + return nil +} + +func (rc *Client) createTablesWithDBPool(ctx context.Context, + createOneTable func(ctx context.Context, db *DB, t *utils.Table) error, + tables []*utils.Table, dbPool []*DB) error { + eg, ectx := errgroup.WithContext(ctx) + workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") + for _, t := range tables { + table := t + workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { + db := dbPool[id] + return createOneTable(ectx, db, table) + }) + } + return eg.Wait() +} + // makeTiFlashOfTableRecord make a 'record' repsenting TiFlash of a table that has been removed. // We doesn't record table ID here because when restore TiFlash replicas, // we use `ALTER TABLE db.tbl SET TIFLASH_REPLICA = xxx` DDL, instead of use some internal TiDB API. @@ -605,7 +600,7 @@ func (rc *Client) setSpeedLimit() error { return err } for _, store := range stores { - err = rc.fileImporter.setDownloadSpeedLimit(store.GetId()) + err = rc.fileImporter.setDownloadSpeedLimit(rc.ctx, store.GetId()) if err != nil { return err } @@ -635,42 +630,28 @@ func (rc *Client) RestoreFiles( log.Debug("start to restore files", zap.Int("files", len(files)), ) - errCh := make(chan error, len(files)) - wg := new(sync.WaitGroup) - defer close(errCh) + eg, ectx := errgroup.WithContext(rc.ctx) err = rc.setSpeedLimit() if err != nil { return err } for _, file := range files { - wg.Add(1) fileReplica := file - rc.workerPool.Apply( - func() { - defer wg.Done() - select { - case <-rc.ctx.Done(): - errCh <- rc.ctx.Err() - case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules): - updateCh.Inc() - } + rc.workerPool.ApplyOnErrorGroup(eg, + func() error { + defer updateCh.Inc() + return rc.fileImporter.Import(ectx, fileReplica, rejectStoreMap, rewriteRules) }) } - for i := range files { - err := <-errCh - if err != nil { - summary.CollectFailureUnit(fmt.Sprintf("file:%d", i), err) - rc.cancel() - wg.Wait() - log.Error( - "restore files failed", - zap.Error(err), - ) - return err - } + if err := eg.Wait(); err != nil { + summary.CollectFailureUnit("file", err) + log.Error( + "restore files failed", + zap.Error(err), + ) + return err } - wg.Wait() return nil } @@ -685,7 +666,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil zap.Duration("take", elapsed)) }() errCh := make(chan error, len(files)) - wg := new(sync.WaitGroup) + eg, ectx := errgroup.WithContext(rc.ctx) defer close(errCh) err := rc.fileImporter.SetRawRange(startKey, endKey) @@ -695,32 +676,21 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil emptyRules := &RewriteRules{} for _, file := range files { - wg.Add(1) fileReplica := file - rc.workerPool.Apply( - func() { - defer wg.Done() - select { - case <-rc.ctx.Done(): - errCh <- rc.ctx.Err() - case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules): - updateCh.Inc() - } + rc.workerPool.ApplyOnErrorGroup(eg, + func() error { + defer updateCh.Inc() + return rc.fileImporter.Import(ectx, fileReplica, nil, emptyRules) }) } - for range files { - err := <-errCh - if err != nil { - rc.cancel() - wg.Wait() - log.Error( - "restore raw range failed", - zap.String("startKey", hex.EncodeToString(startKey)), - zap.String("endKey", hex.EncodeToString(endKey)), - zap.Error(err), - ) - return err - } + if err := eg.Wait(); err != nil { + log.Error( + "restore raw range failed", + zap.String("startKey", hex.EncodeToString(startKey)), + zap.String("endKey", hex.EncodeToString(endKey)), + zap.Error(err), + ) + return err } log.Info( "finish to restore raw range", @@ -799,10 +769,12 @@ func (rc *Client) GoValidateChecksum( workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { start := time.Now() - wg := new(sync.WaitGroup) + wg, ectx := errgroup.WithContext(ctx) defer func() { log.Info("all checksum ended") - wg.Wait() + if err := wg.Wait(); err != nil { + errCh <- err + } elapsed := time.Since(start) summary.CollectDuration("restore checksum", elapsed) outCh <- struct{}{} @@ -810,20 +782,20 @@ func (rc *Client) GoValidateChecksum( }() for { select { + // if we use ectx here, maybe canceled will mask real error. case <-ctx.Done(): errCh <- ctx.Err() case tbl, ok := <-tableStream: if !ok { return } - wg.Add(1) - workers.Apply(func() { - err := rc.execChecksum(ctx, tbl, kvClient) + workers.ApplyOnErrorGroup(wg, func() error { + err := rc.execChecksum(ectx, tbl, kvClient) if err != nil { - errCh <- err + return err } updateCh.Inc() - wg.Done() + return nil }) } } diff --git a/pkg/restore/import.go b/pkg/restore/import.go index e1c7e9a5b..52e9d5c81 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -142,26 +142,19 @@ type FileImporter struct { isRawKvMode bool rawStartKey []byte rawEndKey []byte - - ctx context.Context - cancel context.CancelFunc } // NewFileImporter returns a new file importClient. func NewFileImporter( - ctx context.Context, metaClient SplitClient, importClient ImporterClient, backend *backup.StorageBackend, isRawKvMode bool, rateLimit uint64, ) FileImporter { - ctx, cancel := context.WithCancel(ctx) return FileImporter{ metaClient: metaClient, backend: backend, - ctx: ctx, - cancel: cancel, importClient: importClient, isRawKvMode: isRawKvMode, rateLimit: rateLimit, @@ -181,6 +174,7 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { // Import tries to import a file. // All rules must contain encoded keys. func (importer *FileImporter) Import( + ctx context.Context, file *backup.File, rejectStoreMap map[uint64]bool, rewriteRules *RewriteRules, @@ -209,12 +203,12 @@ func (importer *FileImporter) Import( needReject := len(rejectStoreMap) > 0 - err = utils.WithRetry(importer.ctx, func() error { - ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) + err = utils.WithRetry(ctx, func() error { + cctx, cancel := context.WithTimeout(ctx, importScanRegionTime) defer cancel() // Scan regions covered by the file range regionInfos, errScanRegion := PaginateScanRegion( - ctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) + cctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } @@ -224,7 +218,7 @@ func (importer *FileImporter) Import( startTime := time.Now() log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) for _, region := range regionInfos { - if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { + if !waitForRemoveRejectStores(cctx, importer.metaClient, region, rejectStoreMap) { log.Error("waiting for removing rejected stores failed", utils.ZapRegion(region.Region)) return errors.New("waiting for removing rejected stores failed") @@ -242,12 +236,12 @@ func (importer *FileImporter) Import( info := regionInfo // Try to download file. var downloadMeta *import_sstpb.SSTMeta - errDownload := utils.WithRetry(importer.ctx, func() error { + errDownload := utils.WithRetry(cctx, func() error { var e error if importer.isRawKvMode { - downloadMeta, e = importer.downloadRawKVSST(info, file) + downloadMeta, e = importer.downloadRawKVSST(cctx, info, file) } else { - downloadMeta, e = importer.downloadSST(info, file, rewriteRules) + downloadMeta, e = importer.downloadSST(cctx, info, file, rewriteRules) } return e }, newDownloadSSTBackoffer()) @@ -274,7 +268,7 @@ func (importer *FileImporter) Import( return errDownload } - ingestResp, errIngest := importer.ingestSST(downloadMeta, info) + ingestResp, errIngest := importer.ingestSST(cctx, downloadMeta, info) ingestRetry: for errIngest == nil { errPb := ingestResp.GetError() @@ -294,7 +288,7 @@ func (importer *FileImporter) Import( } else { // Slow path, get region from PD newInfo, errIngest = importer.metaClient.GetRegion( - importer.ctx, info.Region.GetStartKey()) + cctx, info.Region.GetStartKey()) if errIngest != nil { break ingestRetry } @@ -307,7 +301,7 @@ func (importer *FileImporter) Import( errIngest = errors.AddStack(ErrEpochNotMatch) break ingestRetry } - ingestResp, errIngest = importer.ingestSST(downloadMeta, newInfo) + ingestResp, errIngest = importer.ingestSST(cctx, downloadMeta, newInfo) case errPb.EpochNotMatch != nil: // TODO handle epoch not match error // 1. retry download if needed @@ -340,15 +334,16 @@ func (importer *FileImporter) Import( return err } -func (importer *FileImporter) setDownloadSpeedLimit(storeID uint64) error { +func (importer *FileImporter) setDownloadSpeedLimit(ctx context.Context, storeID uint64) error { req := &import_sstpb.SetDownloadSpeedLimitRequest{ SpeedLimit: importer.rateLimit, } - _, err := importer.importClient.SetDownloadSpeedLimit(importer.ctx, storeID, req) + _, err := importer.importClient.SetDownloadSpeedLimit(ctx, storeID, req) return err } func (importer *FileImporter) downloadSST( + ctx context.Context, regionInfo *RegionInfo, file *backup.File, rewriteRules *RewriteRules, @@ -385,7 +380,7 @@ func (importer *FileImporter) downloadSST( ) var resp *import_sstpb.DownloadResponse for _, peer := range regionInfo.Region.GetPeers() { - resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) + resp, err = importer.importClient.DownloadSST(ctx, peer.GetStoreId(), req) if err != nil { return nil, errors.Trace(err) } @@ -402,6 +397,7 @@ func (importer *FileImporter) downloadSST( } func (importer *FileImporter) downloadRawKVSST( + ctx context.Context, regionInfo *RegionInfo, file *backup.File, ) (*import_sstpb.SSTMeta, error) { @@ -439,7 +435,7 @@ func (importer *FileImporter) downloadRawKVSST( ) var resp *import_sstpb.DownloadResponse for _, peer := range regionInfo.Region.GetPeers() { - resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) + resp, err = importer.importClient.DownloadSST(ctx, peer.GetStoreId(), req) if err != nil { return nil, errors.Trace(err) } @@ -456,6 +452,7 @@ func (importer *FileImporter) downloadRawKVSST( } func (importer *FileImporter) ingestSST( + ctx context.Context, sstMeta *import_sstpb.SSTMeta, regionInfo *RegionInfo, ) (*import_sstpb.IngestResponse, error) { @@ -473,7 +470,7 @@ func (importer *FileImporter) ingestSST( Sst: sstMeta, } log.Debug("ingest SST", utils.ZapSSTMeta(sstMeta), zap.Reflect("leader", leader)) - resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) + resp, err := importer.importClient.IngestSST(ctx, leader.GetStoreId(), req) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/utils/worker.go b/pkg/utils/worker.go index 67e9d0dcb..bf269a532 100644 --- a/pkg/utils/worker.go +++ b/pkg/utils/worker.go @@ -63,6 +63,15 @@ func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error) { }) } +// ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID. +func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error) { + worker := pool.apply() + eg.Go(func() error { + defer pool.recycle(worker) + return fn(worker.ID) + }) +} + func (pool *WorkerPool) apply() *Worker { var worker *Worker select { From 7d39a5b1a566cd072911be9d7ea09d3bef62b879 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 9 Jul 2020 14:37:36 +0800 Subject: [PATCH 2/5] tests: remove dbs after test finished --- tests/br_backup_empty/run.sh | 1 + tests/br_small_batch_size/run.sh | 2 ++ tests/br_split_region_fail/run.sh | 2 ++ tests/br_tiflash/run.sh | 2 ++ tests/br_views_and_sequences/run.sh | 2 ++ tests/br_z_gc_safepoint/run.sh | 2 +- 6 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/br_backup_empty/run.sh b/tests/br_backup_empty/run.sh index fda894616..b95844fb5 100644 --- a/tests/br_backup_empty/run.sh +++ b/tests/br_backup_empty/run.sh @@ -50,4 +50,5 @@ run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/empty_table" --ratelimit # insert one row to make sure table is restored. run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "DROP DATABASE $DB" echo "TEST: [$TEST_NAME] successed!" diff --git a/tests/br_small_batch_size/run.sh b/tests/br_small_batch_size/run.sh index aea469fac..04e4cd892 100755 --- a/tests/br_small_batch_size/run.sh +++ b/tests/br_small_batch_size/run.sh @@ -74,3 +74,5 @@ for i in $record_counts; do check_size "t$i" $i done check_size $TABLE 10000 + +run_sql "DROP DATABASE $DB" diff --git a/tests/br_split_region_fail/run.sh b/tests/br_split_region_fail/run.sh index 98739b11f..0ba88ea1b 100644 --- a/tests/br_split_region_fail/run.sh +++ b/tests/br_split_region_fail/run.sh @@ -79,6 +79,8 @@ if $fail; then exit 1 fi +run_sql "DROP DATABASE $DB" + echo "TEST $TEST_NAME passed." diff --git a/tests/br_tiflash/run.sh b/tests/br_tiflash/run.sh index 724357c38..25c3a7cb0 100644 --- a/tests/br_tiflash/run.sh +++ b/tests/br_tiflash/run.sh @@ -73,4 +73,6 @@ if [ $AFTER_BR_COUNT -ne $RECORD_COUNT ]; then exit 1 fi +run_database "DROP DATABASE $DB" + echo "TEST $TEST_NAME passed!" \ No newline at end of file diff --git a/tests/br_views_and_sequences/run.sh b/tests/br_views_and_sequences/run.sh index 345d3aa15..b067bab06 100755 --- a/tests/br_views_and_sequences/run.sh +++ b/tests/br_views_and_sequences/run.sh @@ -45,3 +45,5 @@ views_count=$(run_sql "select count(*) c, sum(m) s from $DB.view_3;" | tail -2 | run_sql "insert into $DB.table_2 (c) values (33);" seq_val=$(run_sql "select a >= 8 and b >= 4 as g from $DB.table_2 where c = 33;" | tail -1) [ "$seq_val" = 'g: 1' ] + +run_sql "drop schema $DB" diff --git a/tests/br_z_gc_safepoint/run.sh b/tests/br_z_gc_safepoint/run.sh index 608798b0f..f65b425a8 100755 --- a/tests/br_z_gc_safepoint/run.sh +++ b/tests/br_z_gc_safepoint/run.sh @@ -65,4 +65,4 @@ if [ "$backup_gc_fail" -ne "1" ];then exit 1 fi -run_sql "DROP TABLE $DB.$TABLE;" +run_sql "DROP DATABASE $DB;" From 9e667fcf204e9e0222008a4546c470a02b18c397 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 9 Jul 2020 15:15:18 +0800 Subject: [PATCH 3/5] restore: fix a bug that may cause dbPool index OoB --- pkg/restore/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 9b4f63516..37c8751c1 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -465,7 +465,7 @@ func (rc *Client) createTablesWithDBPool(ctx context.Context, for _, t := range tables { table := t workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { - db := dbPool[id] + db := dbPool[id%uint64(len(dbPool))] return createOneTable(ectx, db, table) }) } From 7b78eeea79d001b1d1db2a6581b1a866887b2de0 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 9 Jul 2020 15:26:33 +0800 Subject: [PATCH 4/5] tests: fix some wrong commands --- tests/br_split_region_fail/run.sh | 4 +++- tests/br_tiflash/run.sh | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/br_split_region_fail/run.sh b/tests/br_split_region_fail/run.sh index 0ba88ea1b..afbc2f33e 100644 --- a/tests/br_split_region_fail/run.sh +++ b/tests/br_split_region_fail/run.sh @@ -79,7 +79,9 @@ if $fail; then exit 1 fi -run_sql "DROP DATABASE $DB" +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB${i}" +done echo "TEST $TEST_NAME passed." diff --git a/tests/br_tiflash/run.sh b/tests/br_tiflash/run.sh index 25c3a7cb0..9847e4736 100644 --- a/tests/br_tiflash/run.sh +++ b/tests/br_tiflash/run.sh @@ -73,6 +73,6 @@ if [ $AFTER_BR_COUNT -ne $RECORD_COUNT ]; then exit 1 fi -run_database "DROP DATABASE $DB" +run_sql "DROP DATABASE $DB" echo "TEST $TEST_NAME passed!" \ No newline at end of file From 3fc3d5b8beb6c6f594ef447573aff5e7a9ce50c0 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 21 Jul 2020 20:09:35 +0800 Subject: [PATCH 5/5] restore: don't use context with timeout to restore! Signed-off-by: Hillium --- pkg/restore/import.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 52e9d5c81..352c543a9 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -204,11 +204,11 @@ func (importer *FileImporter) Import( needReject := len(rejectStoreMap) > 0 err = utils.WithRetry(ctx, func() error { - cctx, cancel := context.WithTimeout(ctx, importScanRegionTime) + tctx, cancel := context.WithTimeout(ctx, importScanRegionTime) defer cancel() // Scan regions covered by the file range regionInfos, errScanRegion := PaginateScanRegion( - cctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) + tctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } @@ -218,7 +218,7 @@ func (importer *FileImporter) Import( startTime := time.Now() log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) for _, region := range regionInfos { - if !waitForRemoveRejectStores(cctx, importer.metaClient, region, rejectStoreMap) { + if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { log.Error("waiting for removing rejected stores failed", utils.ZapRegion(region.Region)) return errors.New("waiting for removing rejected stores failed") @@ -236,12 +236,12 @@ func (importer *FileImporter) Import( info := regionInfo // Try to download file. var downloadMeta *import_sstpb.SSTMeta - errDownload := utils.WithRetry(cctx, func() error { + errDownload := utils.WithRetry(ctx, func() error { var e error if importer.isRawKvMode { - downloadMeta, e = importer.downloadRawKVSST(cctx, info, file) + downloadMeta, e = importer.downloadRawKVSST(ctx, info, file) } else { - downloadMeta, e = importer.downloadSST(cctx, info, file, rewriteRules) + downloadMeta, e = importer.downloadSST(ctx, info, file, rewriteRules) } return e }, newDownloadSSTBackoffer()) @@ -268,7 +268,7 @@ func (importer *FileImporter) Import( return errDownload } - ingestResp, errIngest := importer.ingestSST(cctx, downloadMeta, info) + ingestResp, errIngest := importer.ingestSST(ctx, downloadMeta, info) ingestRetry: for errIngest == nil { errPb := ingestResp.GetError() @@ -288,7 +288,7 @@ func (importer *FileImporter) Import( } else { // Slow path, get region from PD newInfo, errIngest = importer.metaClient.GetRegion( - cctx, info.Region.GetStartKey()) + ctx, info.Region.GetStartKey()) if errIngest != nil { break ingestRetry } @@ -301,7 +301,7 @@ func (importer *FileImporter) Import( errIngest = errors.AddStack(ErrEpochNotMatch) break ingestRetry } - ingestResp, errIngest = importer.ingestSST(cctx, downloadMeta, newInfo) + ingestResp, errIngest = importer.ingestSST(ctx, downloadMeta, newInfo) case errPb.EpochNotMatch != nil: // TODO handle epoch not match error // 1. retry download if needed