diff --git a/Makefile b/Makefile index f66aa8af6..ec48c6781 100644 --- a/Makefile +++ b/Makefile @@ -123,7 +123,9 @@ lint: tools tidy: prepare @echo "go mod tidy" GO111MODULE=on go mod tidy - git diff --quiet go.mod go.sum + # tidy isn't a read-only task for go.mod, run FINISH_MOD always, + # so our go.mod1 won't stick in old state + git diff --quiet go.mod go.sum || ("$(FINISH_MOD)" && exit 1) $(FINISH_MOD) failpoint-enable: tools diff --git a/main.go b/main.go index 21e27c105..7c6221ace 100644 --- a/main.go +++ b/main.go @@ -30,14 +30,12 @@ func main() { sig := <-sc fmt.Printf("\nGot signal [%v] to exit.\n", sig) log.Warn("received signal to exit", zap.Stringer("signal", sig)) - switch sig { - case syscall.SIGTERM: - cancel() - os.Exit(0) - default: - cancel() - os.Exit(1) - } + cancel() + fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit") + <-sc + // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, + // hence returning fail exit code. + os.Exit(1) }() rootCmd := &cobra.Command{ diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index f28b2732e..79f2e6393 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -4,11 +4,13 @@ package gluetikv import ( "context" + "sync/atomic" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" + "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" "github.com/pingcap/br/pkg/glue" @@ -48,7 +50,7 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)} + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0} } // Record implements glue.Glue. @@ -57,15 +59,28 @@ func (Glue) Record(name string, val uint64) { } type progress struct { - ch chan<- struct{} + ch chan<- struct{} + closed int32 } // Inc implements glue.Progress. func (p progress) Inc() { + if atomic.LoadInt32(&p.closed) != 0 { + log.Warn("proposing a closed progress") + return + } + // there might be buggy if the thread is yielded here. + // however, there should not be gosched, at most time. + // so send here probably is safe, even not totally safe. + // since adding an extra lock should be costly, we just be optimistic. + // (Maybe a spin lock here would be better?) p.ch <- struct{}{} } // Close implements glue.Progress. func (p progress) Close() { + // set closed to true firstly, + // so we won't see a state that the channel is closed and the p.closed is false. + atomic.StoreInt32(&p.closed, 1) close(p.ch) } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 2b921cd16..ddd852be0 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -175,10 +175,12 @@ func NewPdController( } return &PdController{ - addrs: processedAddrs, - cli: cli, - pdClient: pdClient, - schedulerPauseCh: make(chan struct{}), + addrs: processedAddrs, + cli: cli, + pdClient: pdClient, + // We should make a buffered channel here otherwise when context canceled, + // gracefully shutdown will stick at resuming schedulers. + schedulerPauseCh: make(chan struct{}, 1), }, nil } @@ -416,6 +418,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } + log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) mergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { value := clusterCfg.scheduleCfg[cfgKey] diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 4a7431ce4..71f28ca87 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -67,12 +67,8 @@ func (b *Batcher) Len() int { func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { defer func() { if ctx.Err() != nil { - timeout := 5 * time.Second - log.Info("restore canceled, cleaning in a context with timeout", - zap.Stringer("timeout", timeout)) - limitedCtx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - b.manager.Close(limitedCtx) + log.Info("restore canceled, cleaning in background context") + b.manager.Close(context.Background()) } else { b.manager.Close(ctx) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 3841fed9f..b5d370ad7 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -221,6 +221,10 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Debug("removing some PD schedulers") restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 0047fadca..1fd806a7d 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -147,6 +147,10 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if cfg.RemoveSchedulers { restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, try shutdown") + ctx = context.Background() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a4fa9135c..aad38d82f 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -367,6 +367,10 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) func restorePostWork( ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { + if ctx.Err() != nil { + log.Warn("context canceled, try shutdown") + ctx = context.Background() + } if client.IsOnline() { return } diff --git a/run-test.sh b/run-test.sh new file mode 100755 index 000000000..cc085d8ea --- /dev/null +++ b/run-test.sh @@ -0,0 +1,6 @@ +#! /bin/sh + +apt update && apt install default-mysql-client jq --yes + +cd /brie +TEST_NAME=br_other make integration_test \ No newline at end of file diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index e37088884..143130f69 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -17,6 +17,8 @@ set -eu DB="$TEST_NAME" run_sql "CREATE DATABASE $DB;" +trap "run_sql \"DROP DATABASE $DB;\"" EXIT + run_sql "CREATE TABLE $DB.usertable1 ( \ YCSB_KEY varchar(64) NOT NULL, \ @@ -64,7 +66,9 @@ fi echo "backup start to test lock file" PPROF_PORT=6080 GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \ -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 > $TEST_DIR/br-other-stdout.log & +trap "cat $TEST_DIR/br-other-stdout.log" EXIT + # record last backup pid _pid=$! @@ -99,26 +103,45 @@ if ps -p $_pid > /dev/null then echo "$_pid is running" # kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.) - kill $_pid + pkill -P $_pid + echo "$_pid is killed @ $(date)" else echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished" exit 1 fi + # make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. # give enough time to BR so it can gracefully stop. -sleep 5 +sleep 30 if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true' then echo "TEST: [$TEST_NAME] failed because scheduler has been removed" exit 1 fi +default_pd_values='{ + "max-merge-region-keys": 200000, + "max-merge-region-size": 20, + "leader-schedule-limit": 4, + "region-schedule-limit": 2048, + "max-snapshot-count": 3 +}' + +for key in $(echo $default_pd_values | jq 'keys[]'); do + if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); then + curl -s http://$PD_ADDR/pd/api/v1/config/schedule + echo "[$TEST_NAME] failed due to PD config isn't reset after restore" + exit 1 + fi +done + pd_settings=5 # check is there still exists scheduler in pause. pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) -if [ "$pause_schedulers" -ne "3" ];then +# There shouldn't be any paused schedulers since BR gracfully shutdown. +if [ "$pause_schedulers" -ne "0" ];then echo "TEST: [$TEST_NAME] failed because paused scheduler has changed" exit 1 fi @@ -145,7 +168,6 @@ if [ "$pd_settings" -ne "5" ];then exit 1 fi -run_sql "DROP DATABASE $DB;" # Test version run_br --version