From 62471dc738170be428a23aa0e216384ff67a24ed Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 16 Oct 2020 17:27:38 +0800 Subject: [PATCH 1/8] *: when cleaning up, use isolated context --- main.go | 9 +-------- pkg/task/backup.go | 6 ++++++ pkg/task/backup_raw.go | 7 +++++++ pkg/task/restore.go | 6 ++++++ tests/br_other/run.sh | 17 +++++++++++++++++ 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 21e27c105..5b66c604f 100644 --- a/main.go +++ b/main.go @@ -30,14 +30,7 @@ func main() { sig := <-sc fmt.Printf("\nGot signal [%v] to exit.\n", sig) log.Warn("received signal to exit", zap.Stringer("signal", sig)) - switch sig { - case syscall.SIGTERM: - cancel() - os.Exit(0) - default: - cancel() - os.Exit(1) - } + cancel() }() rootCmd := &cobra.Command{ diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 3841fed9f..aebb4fa77 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -221,6 +221,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Debug("removing some PD schedulers") restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + var cancel context.CancelFunc + log.Warn("context canceled, doing clean work with another context with timeout") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 0047fadca..d1fdfef23 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -5,6 +5,7 @@ package task import ( "bytes" "context" + "time" "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" @@ -147,6 +148,12 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if cfg.RemoveSchedulers { restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + var cancel context.CancelFunc + log.Warn("context canceled, doing clean work with another context with timeout") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a4fa9135c..8918f331c 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -367,6 +367,12 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) func restorePostWork( ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { + if ctx.Err() != nil { + var cancel context.CancelFunc + log.Warn("context canceled, doing clean work with another context with timeout") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + } if client.IsOnline() { return } diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index e37088884..9d70f3696 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -105,6 +105,7 @@ else exit 1 fi + # make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. # give enough time to BR so it can gracefully stop. sleep 5 @@ -114,6 +115,22 @@ then exit 1 fi +default_pd_values='{ + "max-merge-region-keys": 200000, + "max-merge-region-size": 20, + "leader-schedule-limit": 4, + "region-schedule-limit": 2048, + "max-snapshot-count": 3 +}' + +for key in $(echo $default_pd_values | jq 'keys[]'); do + if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); + curl -s http://$PD_ADDR/pd/api/v1/config/schedule + echo "[$TEST_NAME] failed due to PD config isn't reset after restore" + exit 1 + fi +done + pd_settings=5 # check is there still exists scheduler in pause. From 9c714ef50b63b3ae042be6fd316b6614764a4382 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 16 Oct 2020 17:55:00 +0800 Subject: [PATCH 2/8] glue: add a guard for Inc --- pkg/gluetikv/glue.go | 18 ++++++++++++++++-- tests/br_other/run.sh | 2 +- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index f28b2732e..f96883dc9 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -9,7 +9,9 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" + "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" + "github.com/uber-go/atomic" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" @@ -48,7 +50,7 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)} + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: new(atomic.Bool)} } // Record implements glue.Glue. @@ -57,15 +59,27 @@ func (Glue) Record(name string, val uint64) { } type progress struct { - ch chan<- struct{} + ch chan<- struct{} + closed *atomic.Bool } // Inc implements glue.Progress. func (p progress) Inc() { + if p.closed.Load() { + log.Warn("proposing a closed progress") + return + } + // there might be buggy if the thread is yielded here. + // however, there should not be gosched, at most time. + // so send here probably is safe, but not totally safe. + // but adding an extra lock should be costly, so just be optimistic. p.ch <- struct{}{} } // Close implements glue.Progress. func (p progress) Close() { + // set closed to true firstly, + // so we won't see a state that the channel is closed and the p.closed is false. + p.closed.Store(true) close(p.ch) } diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 9d70f3696..6ffe15851 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -124,7 +124,7 @@ default_pd_values='{ }' for key in $(echo $default_pd_values | jq 'keys[]'); do - if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); + if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); then curl -s http://$PD_ADDR/pd/api/v1/config/schedule echo "[$TEST_NAME] failed due to PD config isn't reset after restore" exit 1 From cc4497b1bd0bcc13472a2c06c3229233877403df Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 16 Oct 2020 18:16:39 +0800 Subject: [PATCH 3/8] tests: add some debug log --- tests/br_other/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index 6ffe15851..e41f17580 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -64,7 +64,9 @@ fi echo "backup start to test lock file" PPROF_PORT=6080 GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \ -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 > $TEST_DIR/br-other-stdout.log & +trap "cat $TEST_DIR/br-other-stdout.log" EXIT + # record last backup pid _pid=$! From 51b7898f9abadc2aaa94f23f2e416ba3bf6b0e63 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 19 Oct 2020 12:03:35 +0800 Subject: [PATCH 4/8] tests: fix tests --- Makefile | 4 +++- pkg/gluetikv/glue.go | 14 ++++++++------ pkg/pdutil/pd.go | 11 +++++++---- tests/br_other/run.sh | 11 +++++++---- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index f66aa8af6..a785e4ec4 100644 --- a/Makefile +++ b/Makefile @@ -121,10 +121,12 @@ lint: tools CGO_ENABLED=0 tools/bin/revive -formatter friendly -config revive.toml $$($(PACKAGES)) tidy: prepare + # tidy isn't a read-only task for go.mod, run FINISH_MOD always, + # so our go.mod1 won't stick in old state + trap "$(FINISH_MOD)" EXIT @echo "go mod tidy" GO111MODULE=on go mod tidy git diff --quiet go.mod go.sum - $(FINISH_MOD) failpoint-enable: tools tools/bin/failpoint-ctl enable diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index f96883dc9..5dc30f3b9 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -5,13 +5,14 @@ package gluetikv import ( "context" + "sync/atomic" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" - "github.com/uber-go/atomic" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" @@ -60,19 +61,20 @@ func (Glue) Record(name string, val uint64) { type progress struct { ch chan<- struct{} - closed *atomic.Bool + closed int32 } // Inc implements glue.Progress. func (p progress) Inc() { - if p.closed.Load() { + if atomic.LoadInt32(&p.closed) != 0 { log.Warn("proposing a closed progress") return } // there might be buggy if the thread is yielded here. // however, there should not be gosched, at most time. - // so send here probably is safe, but not totally safe. - // but adding an extra lock should be costly, so just be optimistic. + // so send here probably is safe, even not totally safe. + // since adding an extra lock should be costly, we just be optimistic. + // (Maybe a spin lock here would be better?) p.ch <- struct{}{} } @@ -80,6 +82,6 @@ func (p progress) Inc() { func (p progress) Close() { // set closed to true firstly, // so we won't see a state that the channel is closed and the p.closed is false. - p.closed.Store(true) + atomic.StoreInt32(&p.closed, 1) close(p.ch) } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 2b921cd16..ddd852be0 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -175,10 +175,12 @@ func NewPdController( } return &PdController{ - addrs: processedAddrs, - cli: cli, - pdClient: pdClient, - schedulerPauseCh: make(chan struct{}), + addrs: processedAddrs, + cli: cli, + pdClient: pdClient, + // We should make a buffered channel here otherwise when context canceled, + // gracefully shutdown will stick at resuming schedulers. + schedulerPauseCh: make(chan struct{}, 1), }, nil } @@ -416,6 +418,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } + log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) mergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { value := clusterCfg.scheduleCfg[cfgKey] diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index e41f17580..143130f69 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -17,6 +17,8 @@ set -eu DB="$TEST_NAME" run_sql "CREATE DATABASE $DB;" +trap "run_sql \"DROP DATABASE $DB;\"" EXIT + run_sql "CREATE TABLE $DB.usertable1 ( \ YCSB_KEY varchar(64) NOT NULL, \ @@ -101,7 +103,8 @@ if ps -p $_pid > /dev/null then echo "$_pid is running" # kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.) - kill $_pid + pkill -P $_pid + echo "$_pid is killed @ $(date)" else echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished" exit 1 @@ -110,7 +113,7 @@ fi # make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. # give enough time to BR so it can gracefully stop. -sleep 5 +sleep 30 if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true' then echo "TEST: [$TEST_NAME] failed because scheduler has been removed" @@ -137,7 +140,8 @@ pd_settings=5 # check is there still exists scheduler in pause. pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) -if [ "$pause_schedulers" -ne "3" ];then +# There shouldn't be any paused schedulers since BR gracfully shutdown. +if [ "$pause_schedulers" -ne "0" ];then echo "TEST: [$TEST_NAME] failed because paused scheduler has changed" exit 1 fi @@ -164,7 +168,6 @@ if [ "$pd_settings" -ne "5" ];then exit 1 fi -run_sql "DROP DATABASE $DB;" # Test version run_br --version From d83ed800ebbdc5970b2d428dc38f35dee32559d2 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 19 Oct 2020 13:36:05 +0800 Subject: [PATCH 5/8] *: use background context directly --- main.go | 5 +++++ pkg/gluetikv/glue.go | 2 +- pkg/restore/batcher.go | 8 ++------ pkg/task/backup.go | 6 ++---- pkg/task/backup_raw.go | 7 ++----- pkg/task/restore.go | 6 ++---- 6 files changed, 14 insertions(+), 20 deletions(-) diff --git a/main.go b/main.go index 5b66c604f..f543c9646 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,11 @@ func main() { fmt.Printf("\nGot signal [%v] to exit.\n", sig) log.Warn("received signal to exit", zap.Stringer("signal", sig)) cancel() + fmt.Println("gracefully shuting down, press ^C again to force exit") + <-sc + // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, + // hence returning fail exit code. + os.Exit(1) }() rootCmd := &cobra.Command{ diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index 5dc30f3b9..6519d103a 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -51,7 +51,7 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: new(atomic.Bool)} + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0} } // Record implements glue.Glue. diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 4a7431ce4..71f28ca87 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -67,12 +67,8 @@ func (b *Batcher) Len() int { func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { defer func() { if ctx.Err() != nil { - timeout := 5 * time.Second - log.Info("restore canceled, cleaning in a context with timeout", - zap.Stringer("timeout", timeout)) - limitedCtx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - b.manager.Close(limitedCtx) + log.Info("restore canceled, cleaning in background context") + b.manager.Close(context.Background()) } else { b.manager.Close(ctx) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index aebb4fa77..b5d370ad7 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -222,10 +222,8 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig restore, e := mgr.RemoveSchedulers(ctx) defer func() { if ctx.Err() != nil { - var cancel context.CancelFunc - log.Warn("context canceled, doing clean work with another context with timeout") - ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index d1fdfef23..509aa9851 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -5,7 +5,6 @@ package task import ( "bytes" "context" - "time" "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" @@ -149,10 +148,8 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf restore, e := mgr.RemoveSchedulers(ctx) defer func() { if ctx.Err() != nil { - var cancel context.CancelFunc - log.Warn("context canceled, doing clean work with another context with timeout") - ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 8918f331c..265161581 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -368,10 +368,8 @@ func restorePostWork( ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { if ctx.Err() != nil { - var cancel context.CancelFunc - log.Warn("context canceled, doing clean work with another context with timeout") - ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() } if client.IsOnline() { return From b12aab7131a3fa8ab8d1e5f3caf25d91f4ddfc08 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 19 Oct 2020 16:22:59 +0800 Subject: [PATCH 6/8] makefile: don't use trap --- Makefile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index a785e4ec4..ec48c6781 100644 --- a/Makefile +++ b/Makefile @@ -121,12 +121,12 @@ lint: tools CGO_ENABLED=0 tools/bin/revive -formatter friendly -config revive.toml $$($(PACKAGES)) tidy: prepare - # tidy isn't a read-only task for go.mod, run FINISH_MOD always, - # so our go.mod1 won't stick in old state - trap "$(FINISH_MOD)" EXIT @echo "go mod tidy" GO111MODULE=on go mod tidy - git diff --quiet go.mod go.sum + # tidy isn't a read-only task for go.mod, run FINISH_MOD always, + # so our go.mod1 won't stick in old state + git diff --quiet go.mod go.sum || ("$(FINISH_MOD)" && exit 1) + $(FINISH_MOD) failpoint-enable: tools tools/bin/failpoint-ctl enable From 023064c4d707444034cdd90d8862532c282ef1da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Tue, 20 Oct 2020 10:22:50 +0800 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: kennytm Co-authored-by: Neil Shen --- main.go | 2 +- pkg/gluetikv/glue.go | 1 - pkg/task/backup_raw.go | 2 +- pkg/task/restore.go | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index f543c9646..7c6221ace 100644 --- a/main.go +++ b/main.go @@ -31,7 +31,7 @@ func main() { fmt.Printf("\nGot signal [%v] to exit.\n", sig) log.Warn("received signal to exit", zap.Stringer("signal", sig)) cancel() - fmt.Println("gracefully shuting down, press ^C again to force exit") + fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit") <-sc // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, // hence returning fail exit code. diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index 6519d103a..79f2e6393 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -4,7 +4,6 @@ package gluetikv import ( "context" - "sync/atomic" "github.com/pingcap/tidb/config" diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 509aa9851..1fd806a7d 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -148,7 +148,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf restore, e := mgr.RemoveSchedulers(ctx) defer func() { if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") + log.Warn("context canceled, try shutdown") ctx = context.Background() } if restoreE := restore(ctx); restoreE != nil { diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 265161581..aad38d82f 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -368,7 +368,7 @@ func restorePostWork( ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") + log.Warn("context canceled, try shutdown") ctx = context.Background() } if client.IsOnline() { From fd4c69bcc444562452049d456e4dbfd5e751e8b2 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 20 Oct 2020 11:54:37 +0800 Subject: [PATCH 8/8] glue: add a TODO --- run-test.sh | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100755 run-test.sh diff --git a/run-test.sh b/run-test.sh new file mode 100755 index 000000000..cc085d8ea --- /dev/null +++ b/run-test.sh @@ -0,0 +1,6 @@ +#! /bin/sh + +apt update && apt install default-mysql-client jq --yes + +cd /brie +TEST_NAME=br_other make integration_test \ No newline at end of file