Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ lint: tools
tidy: prepare
@echo "go mod tidy"
GO111MODULE=on go mod tidy
git diff --quiet go.mod go.sum
# tidy isn't a read-only task for go.mod, run FINISH_MOD always,
# so our go.mod1 won't stick in old state
git diff --quiet go.mod go.sum || ("$(FINISH_MOD)" && exit 1)
$(FINISH_MOD)

failpoint-enable: tools
Expand Down
14 changes: 6 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func main() {
sig := <-sc
fmt.Printf("\nGot signal [%v] to exit.\n", sig)
log.Warn("received signal to exit", zap.Stringer("signal", sig))
switch sig {
case syscall.SIGTERM:
cancel()
os.Exit(0)
default:
cancel()
os.Exit(1)
}
cancel()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about trying to graceful shutdown at the first time BR receives signal, and os.Exit() when it receives signal twice? So we can avoid hard code 30 seconds timeout.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL~

fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit")
<-sc
// Even user use SIGTERM to exit, there isn't any checkpoint for resuming,
// hence returning fail exit code.
os.Exit(1)
}()

rootCmd := &cobra.Command{
Expand Down
19 changes: 17 additions & 2 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package gluetikv

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
pd "github.com/tikv/pd/client"

"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -48,7 +50,7 @@ func (Glue) OwnsStorage() bool {

// StartProgress implements glue.Glue.
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)}
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0}
}

// Record implements glue.Glue.
Expand All @@ -57,15 +59,28 @@ func (Glue) Record(name string, val uint64) {
}

type progress struct {
ch chan<- struct{}
ch chan<- struct{}
closed int32
}

// Inc implements glue.Progress.
func (p progress) Inc() {
if atomic.LoadInt32(&p.closed) != 0 {
log.Warn("proposing a closed progress")
return
}
// there might be buggy if the thread is yielded here.
// however, there should not be gosched, at most time.
// so send here probably is safe, even not totally safe.
// since adding an extra lock should be costly, we just be optimistic.
// (Maybe a spin lock here would be better?)
p.ch <- struct{}{}
Comment on lines +62 to 77
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible to just make this entire progress structure an atomic integer instead of a channel, like https://github.com/pingcap/tidb/blob/c7651f02be36b5faf95a204d20f8859eb9c2080a/executor/brie.go#L49-L73? tbh i don't see any reason why we use a channel in the first place, since we're using a 1-second timer to update the progress bar in any case.

Copy link
Copy Markdown
Collaborator Author

@YuJuncen YuJuncen Oct 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, there would be more change than this pr has done now if we refactor the progress struct here. how about leaving a TODO here and leave it another PR?

(The struct that proposing the progress bar is ProgressPrinter, which is designed to use a channel as the interface for proposing the progress. Either adapting it or change the interface is complex enough for opening another PR.)

Copy link
Copy Markdown
Collaborator Author

@YuJuncen YuJuncen Oct 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(But I could have a try in this PR, anyway, if desired)

}

// Close implements glue.Progress.
func (p progress) Close() {
// set closed to true firstly,
// so we won't see a state that the channel is closed and the p.closed is false.
atomic.StoreInt32(&p.closed, 1)
close(p.ch)
}
11 changes: 7 additions & 4 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,12 @@ func NewPdController(
}

return &PdController{
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
schedulerPauseCh: make(chan struct{}),
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
// We should make a buffered channel here otherwise when context canceled,
// gracefully shutdown will stick at resuming schedulers.
schedulerPauseCh: make(chan struct{}, 1),
}, nil
}

Expand Down Expand Up @@ -416,6 +418,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg))
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
Expand Down
8 changes: 2 additions & 6 deletions pkg/restore/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ func (b *Batcher) Len() int {
func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) {
defer func() {
if ctx.Err() != nil {
timeout := 5 * time.Second
log.Info("restore canceled, cleaning in a context with timeout",
zap.Stringer("timeout", timeout))
limitedCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
b.manager.Close(limitedCtx)
log.Info("restore canceled, cleaning in background context")
b.manager.Close(context.Background())
} else {
b.manager.Close(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
log.Debug("removing some PD schedulers")
restore, e := mgr.RemoveSchedulers(ctx)
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
ctx = context.Background()
}
if restoreE := restore(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if cfg.RemoveSchedulers {
restore, e := mgr.RemoveSchedulers(ctx)
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, try shutdown")
ctx = context.Background()
}
if restoreE := restore(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr)
func restorePostWork(
ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc,
) {
if ctx.Err() != nil {
log.Warn("context canceled, try shutdown")
ctx = context.Background()
}
if client.IsOnline() {
return
}
Expand Down
6 changes: 6 additions & 0 deletions run-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#! /bin/sh

apt update && apt install default-mysql-client jq --yes

cd /brie
TEST_NAME=br_other make integration_test
32 changes: 27 additions & 5 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ set -eu
DB="$TEST_NAME"

run_sql "CREATE DATABASE $DB;"
trap "run_sql \"DROP DATABASE $DB;\"" EXIT


run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
Expand Down Expand Up @@ -64,7 +66,9 @@ fi
echo "backup start to test lock file"
PPROF_PORT=6080
GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null &
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 > $TEST_DIR/br-other-stdout.log &
trap "cat $TEST_DIR/br-other-stdout.log" EXIT

# record last backup pid
_pid=$!

Expand Down Expand Up @@ -99,26 +103,45 @@ if ps -p $_pid > /dev/null
then
echo "$_pid is running"
# kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.)
kill $_pid
pkill -P $_pid
echo "$_pid is killed @ $(date)"
else
echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished"
exit 1
fi


# make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it.
# give enough time to BR so it can gracefully stop.
sleep 5
sleep 30
if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true'
then
echo "TEST: [$TEST_NAME] failed because scheduler has been removed"
exit 1
fi

default_pd_values='{
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3
}'

for key in $(echo $default_pd_values | jq 'keys[]'); do
if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); then
curl -s http://$PD_ADDR/pd/api/v1/config/schedule
echo "[$TEST_NAME] failed due to PD config isn't reset after restore"
exit 1
fi
done

pd_settings=5

# check is there still exists scheduler in pause.
pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l)
if [ "$pause_schedulers" -ne "3" ];then
# There shouldn't be any paused schedulers since BR gracfully shutdown.
if [ "$pause_schedulers" -ne "0" ];then
echo "TEST: [$TEST_NAME] failed because paused scheduler has changed"
exit 1
fi
Expand All @@ -145,7 +168,6 @@ if [ "$pd_settings" -ne "5" ];then
exit 1
fi

run_sql "DROP DATABASE $DB;"

# Test version
run_br --version
Expand Down