diff --git a/Makefile b/Makefile index 98f874945f..1a02b30cdf 100644 --- a/Makefile +++ b/Makefile @@ -192,8 +192,8 @@ coverage_fix_cover_mode: sed -i "s/mode: count/mode: atomic/g" $(TEST_DIR)/cov.*.dmctl.*.out coverage: coverage_fix_cover_mode retool_setup - retool do gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > "$(TEST_DIR)/all_cov.out" - retool do gocovmerge "$(TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > $(TEST_DIR)/unit_test.out + retool do gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > "$(TEST_DIR)/all_cov.out" + retool do gocovmerge "$(TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > $(TEST_DIR)/unit_test.out ifeq ("$(JenkinsCI)", "1") @bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_test.out -t $(CODECOV_TOKEN) @retool do goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN) diff --git a/dm/master/shardddl/optimist.go b/dm/master/shardddl/optimist.go index e31de1f518..b63da694ea 100644 --- a/dm/master/shardddl/optimist.go +++ b/dm/master/shardddl/optimist.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/metrics" "github.com/pingcap/dm/dm/pb" @@ -549,8 +550,40 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) { failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) { t := val.(int) - log.L().Info("wait new ddl info putted into etcd", zap.String("failpoint", "SleepWhenRemoveLock")) - time.Sleep(time.Duration(t) * time.Second) + log.L().Info("wait new ddl info putted into etcd", + zap.String("failpoint", "SleepWhenRemoveLock"), + zap.Int("max wait second", t)) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + timer := time.NewTimer(time.Duration(t) * time.Second) + defer timer.Stop() + OUTER: + for { + select { + case <-timer.C: + log.L().Info("failed to wait new DDL info", zap.Int("wait second", t)) + break OUTER + case <-ticker.C: + // manually check etcd + cmps := make([]clientv3.Cmp, 0) + for source, schemaTables := range lock.Ready() { + for schema, tables := range schemaTables { + for table := range tables { + info := optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil) + info.Version = lock.GetVersion(source, schema, table) + key := common.ShardDDLOptimismInfoKeyAdapter.Encode(info.Task, info.Source, info.UpSchema, info.UpTable) + cmps = append(cmps, clientv3.Compare(clientv3.Version(key), "<", info.Version+1)) + } + } + } + resp, _, err := etcdutil.DoOpsInOneCmpsTxnWithRetry(o.cli, cmps, nil, nil) + if err == nil && !resp.Succeeded { + log.L().Info("found new DDL info") + break OUTER + } + } + } }) deleted, err := o.deleteInfosOps(lock) if err != nil { diff --git a/dm/master/shardddl/pessimist_test.go b/dm/master/shardddl/pessimist_test.go index 64eadf53f7..2498a9536a 100644 --- a/dm/master/shardddl/pessimist_test.go +++ b/dm/master/shardddl/pessimist_test.go @@ -201,7 +201,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) { done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op12c, i12) c.Assert(err, IsNil) c.Assert(done, IsTrue) - c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { _, ok := p.Locks()[ID1] return !ok }), IsTrue) diff --git a/pkg/shardddl/optimism/info_test.go b/pkg/shardddl/optimism/info_test.go index 6479994fa4..efc54679a5 100644 --- a/pkg/shardddl/optimism/info_test.go +++ b/pkg/shardddl/optimism/info_test.go @@ -179,7 +179,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { resp, err := etcdTestCli.Txn(context.Background()).Then(deleteOp).Commit() c.Assert(err, IsNil) c.Assert(resp.Succeeded, IsTrue) - <-wch + select { + case err2 := <-ech: + c.Fatal(err2) + case <-wch: + } // put again // version reset to 1 diff --git a/relay/relay_test.go b/relay/relay_test.go index 8ba84484fd..56b961dd64 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -566,8 +566,11 @@ func (t *testRelaySuite) TestProcess(c *C) { // kill the binlog dump connection ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) defer cancel2() - connID, err := getBinlogDumpConnID(ctx2, r.db) - c.Assert(err, IsNil) + var connID uint32 + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + connID, err = getBinlogDumpConnID(ctx2, r.db) + return err == nil + }), IsTrue) _, err = r.db.ExecContext(ctx2, fmt.Sprintf(`KILL %d`, connID)) c.Assert(err, IsNil) diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 270529f527..6f2ddb7981 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -204,7 +204,7 @@ function check_log_contain_with_retry() { continue fi got=`grep "$text" $log1 | wc -l` - if [[ ! $got = 0 ]]; then + if [[ $got -ne 0 ]]; then rc=1 break fi @@ -215,7 +215,7 @@ function check_log_contain_with_retry() { continue fi got=`grep "$text" $log2 | wc -l` - if [[ ! $got = 0 ]]; then + if [[ $got -ne 0 ]]; then rc=1 break fi @@ -223,7 +223,7 @@ function check_log_contain_with_retry() { echo "check log contain failed $k-th time, retry later" sleep 2 done - if [[ $rc = 0 ]]; then + if [[ $rc -eq 0 ]]; then echo "log dosen't contain $text" exit 1 fi diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 66f39d535b..abfc3fe566 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -72,8 +72,8 @@ function test_multi_task_running() { sleep 5 # wait for flush checkpoint echo "use sync_diff_inspector to check increment data" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 || print_debug_status - check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 10 || print_debug_status + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 50 || print_debug_status + check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 50 || print_debug_status echo "[$(date)] <<<<<< finish test_multi_task_running >>>>>>" } @@ -421,14 +421,15 @@ function test_pause_task() { task_name=(test test2) for name in ${task_name[@]}; do echo "pause tasks $name" + + # because some SQL may running (often remove checkpoint record), pause will cause that SQL failed + # thus `result` is not true run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "pause-task $name"\ - "\"result\": true" 3 + "pause-task $name" # pause twice, just used to test pause by the way run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "pause-task $name"\ - "\"result\": true" 3 + "pause-task $name" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name"\ diff --git a/tests/shardddl3/run.sh b/tests/shardddl3/run.sh index 5d7b185ee2..5821dc99de 100644 --- a/tests/shardddl3/run.sh +++ b/tests/shardddl3/run.sh @@ -610,6 +610,23 @@ function DM_102_CASE() { run_sql_source1 "insert into ${shardddl1}.${tb1} values (1,1);" run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int default -1;" + sleep 1 + # wait DM receive source2's DDL + found=false + for ((k=0; k<10; k++)); do + content=$($PWD/bin/dmctl.test DEVEL --master-addr=127.0.0.1:$MASTER_PORT query-status test) + master2=$(echo $content | sed 's/"masterBinlog":/"masterBinlog":\n/g' | awk -F')' 'FNR==3{print $1}') + syncer2=$(echo $content | sed 's/"syncerBinlog":/"syncerBinlog":\n/g' | awk -F')' 'FNR==3{print $1}') + if [ "$master2" != "$syncer2" ]; then + found=true + break + fi + done + if [[ $found == false ]]; then + echo "didn't receive mismatched DDL" + exit 2 + fi + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "show-ddl-locks" \ "\"ID\": \"test-\`shardddl\`.\`tb\`\"" 1 @@ -683,7 +700,7 @@ function DM_RemoveLock_CASE() { function DM_RemoveLock() { ps aux | grep dm-master |awk '{print $2}'|xargs kill || true check_port_offline $MASTER_PORT1 20 - export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(5)" + export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(10)" run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \