Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ coverage_fix_cover_mode:
sed -i "s/mode: count/mode: atomic/g" $(TEST_DIR)/cov.*.dmctl.*.out

coverage: coverage_fix_cover_mode retool_setup
retool do gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > "$(TEST_DIR)/all_cov.out"
retool do gocovmerge "$(TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > $(TEST_DIR)/unit_test.out
retool do gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > "$(TEST_DIR)/all_cov.out"
retool do gocovmerge "$(TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > $(TEST_DIR)/unit_test.out
ifeq ("$(JenkinsCI)", "1")
@bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_test.out -t $(CODECOV_TOKEN)
@retool do goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
Expand Down
37 changes: 35 additions & 2 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/master/metrics"
"github.com/pingcap/dm/dm/pb"
Expand Down Expand Up @@ -549,8 +550,40 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
func (o *Optimist) removeLock(lock *optimism.Lock) (bool, error) {
failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd", zap.String("failpoint", "SleepWhenRemoveLock"))
time.Sleep(time.Duration(t) * time.Second)
log.L().Info("wait new ddl info putted into etcd",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timer := time.NewTimer(time.Duration(t) * time.Second)
defer timer.Stop()
OUTER:
for {
select {
case <-timer.C:
log.L().Info("failed to wait new DDL info", zap.Int("wait second", t))
break OUTER
case <-ticker.C:
// manually check etcd
cmps := make([]clientv3.Cmp, 0)
for source, schemaTables := range lock.Ready() {
for schema, tables := range schemaTables {
for table := range tables {
info := optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil)
info.Version = lock.GetVersion(source, schema, table)
key := common.ShardDDLOptimismInfoKeyAdapter.Encode(info.Task, info.Source, info.UpSchema, info.UpTable)
cmps = append(cmps, clientv3.Compare(clientv3.Version(key), "<", info.Version+1))
}
}
}
resp, _, err := etcdutil.DoOpsInOneCmpsTxnWithRetry(o.cli, cmps, nil, nil)
if err == nil && !resp.Succeeded {
log.L().Info("found new DDL info")
break OUTER
}
}
}
})
deleted, err := o.deleteInfosOps(lock)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op12c, i12)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool {
_, ok := p.Locks()[ID1]
return !ok
}), IsTrue)
Expand Down
6 changes: 5 additions & 1 deletion pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
resp, err := etcdTestCli.Txn(context.Background()).Then(deleteOp).Commit()
c.Assert(err, IsNil)
c.Assert(resp.Succeeded, IsTrue)
<-wch
select {
case err2 := <-ech:
c.Fatal(err2)
case <-wch:
}

// put again
// version reset to 1
Expand Down
7 changes: 5 additions & 2 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,11 @@ func (t *testRelaySuite) TestProcess(c *C) {
// kill the binlog dump connection
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
connID, err := getBinlogDumpConnID(ctx2, r.db)
c.Assert(err, IsNil)
var connID uint32
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
connID, err = getBinlogDumpConnID(ctx2, r.db)
return err == nil
}), IsTrue)
_, err = r.db.ExecContext(ctx2, fmt.Sprintf(`KILL %d`, connID))
c.Assert(err, IsNil)

Expand Down
6 changes: 3 additions & 3 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ function check_log_contain_with_retry() {
continue
fi
got=`grep "$text" $log1 | wc -l`
if [[ ! $got = 0 ]]; then
if [[ $got -ne 0 ]]; then
rc=1
break
fi
Expand All @@ -215,15 +215,15 @@ function check_log_contain_with_retry() {
continue
fi
got=`grep "$text" $log2 | wc -l`
if [[ ! $got = 0 ]]; then
if [[ $got -ne 0 ]]; then
rc=1
break
fi
fi
echo "check log contain failed $k-th time, retry later"
sleep 2
done
if [[ $rc = 0 ]]; then
if [[ $rc -eq 0 ]]; then
echo "log dosen't contain $text"
exit 1
fi
Expand Down
13 changes: 7 additions & 6 deletions tests/ha_cases/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ function test_multi_task_running() {

sleep 5 # wait for flush checkpoint
echo "use sync_diff_inspector to check increment data"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 10 || print_debug_status
check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 10 || print_debug_status
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 50 || print_debug_status
check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml 50 || print_debug_status
echo "[$(date)] <<<<<< finish test_multi_task_running >>>>>>"
}

Expand Down Expand Up @@ -421,14 +421,15 @@ function test_pause_task() {
task_name=(test test2)
for name in ${task_name[@]}; do
echo "pause tasks $name"

# because some SQL may running (often remove checkpoint record), pause will cause that SQL failed
# thus `result` is not true
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task $name"\
"\"result\": true" 3
"pause-task $name"

# pause twice, just used to test pause by the way
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task $name"\
"\"result\": true" 3
"pause-task $name"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $name"\
Expand Down
19 changes: 18 additions & 1 deletion tests/shardddl3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,23 @@ function DM_102_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb1} values (1,1);"
run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int default -1;"

sleep 1
# wait DM receive source2's DDL
found=false
for ((k=0; k<10; k++)); do
content=$($PWD/bin/dmctl.test DEVEL --master-addr=127.0.0.1:$MASTER_PORT query-status test)
master2=$(echo $content | sed 's/"masterBinlog":/"masterBinlog":\n/g' | awk -F')' 'FNR==3{print $1}')
syncer2=$(echo $content | sed 's/"syncerBinlog":/"syncerBinlog":\n/g' | awk -F')' 'FNR==3{print $1}')
if [ "$master2" != "$syncer2" ]; then
found=true
Comment thread
csuzhangxc marked this conversation as resolved.
break
fi
done
if [[ $found == false ]]; then
echo "didn't receive mismatched DDL"
exit 2
fi

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"show-ddl-locks" \
"\"ID\": \"test-\`shardddl\`.\`tb\`\"" 1
Expand Down Expand Up @@ -683,7 +700,7 @@ function DM_RemoveLock_CASE() {
function DM_RemoveLock() {
ps aux | grep dm-master |awk '{print $2}'|xargs kill || true
check_port_offline $MASTER_PORT1 20
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(5)"
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(10)"
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down