diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 11ed2edfa0..37b5bea2d8 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -128,17 +128,31 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=savepoint - # The 14th case in this group - name: Test server config compatibility if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=server_config_compatibility - # The 15th case in this group - name: Test split region if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=split_region + + # The 16th case in this group + - name: Test changefeed resume with checkpoint ts + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=changefeed_resume_with_checkpoint_ts + + - name: Test capture suicide while balance table + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=capture_suicide_while_balance_table + + - name: Test kv client stream reconnect + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=kv_client_stream_reconnect - name: Upload test logs if: always() diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 9664bbd7b2..4fc2bdb6ae 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" @@ -173,6 +174,17 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred return s.receiveAndDispatchChangeEvents(conn) }) g.Go(func() error { return s.processRegionSendTask(gctx, conn) }) + + failpoint.Inject("InjectForceReconnect", func() { + timer := time.After(10 * time.Second) + g.Go(func() error { + <-timer + err := errors.New("inject force reconnect") + log.Info("inject force reconnect", zap.Error(err)) + return err + }) + }) + _ = g.Wait() return isCanceled() } @@ -427,3 +439,15 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { } return regions } + +func (s *regionRequestWorker) getAllRegionStates() regionFeedStates { + s.requestedRegions.RLock() + defer s.requestedRegions.RUnlock() + states := make(regionFeedStates) + for _, statesMap := range s.requestedRegions.subscriptions { + for regionID, state := range statesMap { + states[regionID] = state + } + } + return states +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 9cd61ba4fd..cfbe12958e 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -30,10 +30,10 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/spanz" - "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index d9d6523678..029676448c 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -402,7 +402,7 @@ func (c *Controller) addNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table } func (c *Controller) loadTables(startTs uint64) ([]commonEvent.Table, error) { - // todo: do we need to set timezone here? + // Use a empty timezone because table filter does not need it. f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.CaseSensitive, c.cfConfig.ForceReplicate) if err != nil { return nil, errors.Cause(err) diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index 6b7cdb82a4..1870cd8fd8 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -29,6 +29,7 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/parser/mysql" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/zap" @@ -170,6 +171,8 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error { failpoint.Return(err) }) + failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(w.ctx, time.Hour) }) + failpoint.Inject("MySQLDuplicateEntryError", func() { log.Warn("inject MySQLDuplicateEntryError") err := cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{ diff --git a/pkg/util/atomic.go b/pkg/util/atomic.go new file mode 100644 index 0000000000..a0047a83fc --- /dev/null +++ b/pkg/util/atomic.go @@ -0,0 +1,58 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +type numbers interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr | float32 | float64 +} + +type genericAtomic[T numbers] interface { + Load() T + Store(T) + CompareAndSwap(old, new T) bool +} + +// CompareAndIncrease updates the target if the new value is larger than or equal to the old value. +// It returns false if the new value is smaller than the old value. +func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool { + for { + old := target.Load() + if new < old { + return false + } + if new == old || target.CompareAndSwap(old, new) { + return true + } + } +} + +// CompareAndMonotonicIncrease updates the target if the new value is larger than the old value. +// It returns false if the new value is smaller than or equal to the old value. +func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool { + for { + old := target.Load() + if new <= old { + return false + } + if target.CompareAndSwap(old, new) { + return true + } + } +} + +// MustCompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It do nothing +// if the new value is smaller than or equal to the old value. +func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) { + _ = CompareAndMonotonicIncrease(target, new) +} diff --git a/pkg/util/atomic_test.go b/pkg/util/atomic_test.go new file mode 100644 index 0000000000..900720ebb5 --- /dev/null +++ b/pkg/util/atomic_test.go @@ -0,0 +1,106 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMustCompareAndIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + + doIncrease := func() { + for { + select { + case <-ctx.Done(): + return + default: + delta := rand.Int63n(100) + v := target.Load() + delta + MustCompareAndMonotonicIncrease(&target, v) + require.GreaterOrEqual(t, target.Load(), v) + } + } + } + + // Test target increase. + wg.Add(2) + go func() { + defer wg.Done() + doIncrease() + }() + go func() { + defer wg.Done() + doIncrease() + }() + + wg.Add(1) + // Test target never decrease. + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + v := target.Load() - 1 + MustCompareAndMonotonicIncrease(&target, v) + require.Greater(t, target.Load(), v) + } + } + }() + + cancel() + wg.Wait() +} + +func TestCompareAndIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + require.True(t, CompareAndIncrease(&target, 10)) + require.Equal(t, int64(10), target.Load()) + + require.True(t, CompareAndIncrease(&target, 20)) + require.Equal(t, int64(20), target.Load()) + require.False(t, CompareAndIncrease(&target, 19)) + require.Equal(t, int64(20), target.Load()) +} + +func TestCompareAndMonotonicIncrease(t *testing.T) { + t.Parallel() + + var target atomic.Int64 + target.Store(10) + require.False(t, CompareAndMonotonicIncrease(&target, 10)) + require.Equal(t, int64(10), target.Load()) + + require.True(t, CompareAndMonotonicIncrease(&target, 11)) + require.Equal(t, int64(11), target.Load()) + require.False(t, CompareAndMonotonicIncrease(&target, 10)) + require.Equal(t, int64(11), target.Load()) +} diff --git a/pkg/util/net_util.go b/pkg/util/util.go similarity index 78% rename from pkg/util/net_util.go rename to pkg/util/util.go index f3fe74535b..0c1949914d 100644 --- a/pkg/util/net_util.go +++ b/pkg/util/util.go @@ -14,8 +14,10 @@ package util import ( + "context" "net" "strconv" + "time" "github.com/pingcap/errors" ) @@ -33,3 +35,17 @@ func ParseHostAndPortFromAddress(address string) (string, uint, error) { } return host, uint(portNumeric), nil } + +// Hang will block the goroutine for a given duration, or return when `ctx` is done. +func Hang(ctx context.Context, dur time.Duration) error { + timer := time.NewTimer(dur) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 0000000000..ade102e4aa --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,109 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestParseHostAndPortFromAddress(t *testing.T) { + tests := []struct { + name string + address string + expectedHost string + expectedPort uint + expectedError bool + }{ + { + name: "valid address", + address: "127.0.0.1:2379", + expectedHost: "127.0.0.1", + expectedPort: 2379, + }, + { + name: "valid address with IPv6", + address: "[::1]:2379", + expectedHost: "::1", + expectedPort: 2379, + }, + { + name: "invalid address format", + address: "127.0.0.1", + expectedError: true, + }, + { + name: "invalid port number", + address: "127.0.0.1:0", + expectedError: true, + }, + { + name: "invalid port format", + address: "127.0.0.1:abc", + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + host, port, err := ParseHostAndPortFromAddress(tt.address) + if tt.expectedError { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.expectedHost, host) + require.Equal(t, tt.expectedPort, port) + }) + } +} + +func TestHang(t *testing.T) { + t.Run("normal completion", func(t *testing.T) { + ctx := context.Background() + duration := 100 * time.Millisecond + start := time.Now() + err := Hang(ctx, duration) + elapsed := time.Since(start) + require.NoError(t, err) + require.GreaterOrEqual(t, elapsed, duration) + }) + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + duration := 1 * time.Second + done := make(chan struct{}) + + go func() { + err := Hang(ctx, duration) + require.Error(t, err) + require.Equal(t, context.Canceled, err) + close(done) + }() + + // Cancel context after a short delay + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case <-done: + // Test passed + case <-time.After(500 * time.Millisecond): + t.Fatal("Hang did not return after context cancellation") + } + }) +} diff --git a/tests/integration_tests/capture_suicide_while_balance_table/run.sh b/tests/integration_tests/capture_suicide_while_balance_table/run.sh index ce776bdbcc..22add6f626 100644 --- a/tests/integration_tests/capture_suicide_while_balance_table/run.sh +++ b/tests/integration_tests/capture_suicide_while_balance_table/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -eu +set -eux CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $CUR/../_utils/test_prepare @@ -9,7 +9,7 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 # This test mainly verifies CDC can handle the following scenario -# 1. Two captures, capture-1 is the owner, each capture replicates more than one table. +# 1. Two captures, capture-1 is the coordinator, each capture replicates more than one table. # 2. capture-2 replicates some DMLs but has some delay, such as large amount of # incremental scan data, sink block, etc, we name this slow table as table-slow. # 3. Before capture-2 the checkpoint ts of table-slow reaches global resolved ts, @@ -36,7 +36,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr --logsuffix 1 --addr "127.0.0.1:8300" - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/sink/mysql/MySQLSinkHangLongTime=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr --logsuffix 2 --addr "127.0.0.1:8301" SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" @@ -56,11 +56,11 @@ function run() { target_capture=$capture1_id # find a table that capture2 is replicating - one_table_id=$(curl curl -X GET http://127.0.0.1:8301/api/v2/changefeeds/${changefeed_id}/tables | grep $capture2_id | jq -r '.status.tables|keys[0]') - if [[ $one_table_id == "null" ]]; then + one_table_id=$(curl -X GET "http://127.0.0.1:8301/api/v2/changefeeds/${changefeed_id}/tables" | jq -r --arg cid "$capture2_id" '.items[] | select(.node_id==$cid) | .table_ids[0]') + if [[ $one_table_id == "null" || $one_table_id == "0" ]]; then # if not found, find a table that capture1 is replicating target_capture=$capture2_id - one_table_id=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id}/tables | grep $capture1_id | jq -r '.status.tables|keys[0]') + one_table_id=$(curl -X GET "http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id}/tables" | jq -r --arg cid "$capture1_id" '.items[] | select(.node_id==$cid) | .table_ids[0]') fi table_query=$(mysql -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -uroot -e "select table_name from information_schema.tables where tidb_table_id = ${one_table_id}\G") table_name=$(echo $table_query | tail -n 1 | awk '{print $(NF)}') @@ -68,8 +68,7 @@ function run() { # sleep some time to wait global resolved ts forwarded sleep 2 - curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id}/move_table?tableID=${one_table_id} & - targetNodeID=${target_capture} + curl -X POST "http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id}/move_table?tableID=${one_table_id}&targetNodeID=${target_capture}" # sleep some time to wait table balance job is written to etcd sleep 2 @@ -78,6 +77,12 @@ function run() { lease_hex=$(printf '%x\n' $lease) ETCDCTL_API=3 etcdctl lease revoke $lease_hex + # sleep some time to wait capture2 suicides + sleep 10 + + # start capture2 again + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix 2 --addr "127.0.0.1:8301" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml export GO_FAILPOINTS='' cleanup_process $CDC_BINARY diff --git a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh index 18eb24ccda..af3fed6db9 100644 --- a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh +++ b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh @@ -78,11 +78,11 @@ function resume_changefeed_in_stopped_state() { } function resume_changefeed_in_failed_state() { - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/InjectChangefeedFastFailError=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=1*return(true)' pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "failed" "ErrStartTsBeforeGC" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "failed" "ErrSnapshotLostByGC" "" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr --overwrite-checkpoint-ts=now --no-confirm=true ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" diff --git a/tests/integration_tests/kv_client_stream_reconnect/run.sh b/tests/integration_tests/kv_client_stream_reconnect/run.sh index 72c0b41534..083b26dd9a 100644 --- a/tests/integration_tests/kv_client_stream_reconnect/run.sh +++ b/tests/integration_tests/kv_client_stream_reconnect/run.sh @@ -28,8 +28,8 @@ function run() { *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; esac - # this will be triggered every 5s in kv client - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/kv/kvClientForceReconnect=return(true)' + # this will be triggered every 5s in logpuller + export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/logpuller/InjectForceReconnect=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr if [ "$SINK_TYPE" == "pulsar" ]; then cat <>$WORK_DIR/pulsar_test.toml @@ -55,7 +55,7 @@ EOF run_sql "create table kv_client_stream_reconnect.t$i (id int primary key auto_increment, a int default 10);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} done - for i in $(seq 30); do + for i in $(seq 60); do tbl="t$((1 + $RANDOM % $TABLE_COUNT))" run_sql "insert into kv_client_stream_reconnect.$tbl values (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} sleep 1 diff --git a/tests/integration_tests/owner_remove_table_error/conf/diff_config.toml b/tests/integration_tests/owner_remove_table_error/conf/diff_config.toml deleted file mode 100644 index 750741652f..0000000000 --- a/tests/integration_tests/owner_remove_table_error/conf/diff_config.toml +++ /dev/null @@ -1,29 +0,0 @@ -# diff Configuration. - -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] - output-dir = "/tmp/tidb_cdc_test/owner_remove_table_error/sync_diff/output" - - source-instances = ["mysql1"] - - target-instance = "tidb0" - - target-check-tables = ["owner_remove_table_error.t?*"] - -[data-sources] -[data-sources.mysql1] - host = "127.0.0.1" - port = 4000 - user = "root" - password = "" - -[data-sources.tidb0] - host = "127.0.0.1" - port = 3306 - user = "root" - password = "" diff --git a/tests/integration_tests/owner_remove_table_error/run.sh b/tests/integration_tests/owner_remove_table_error/run.sh deleted file mode 100644 index 22d5a21c8f..0000000000 --- a/tests/integration_tests/owner_remove_table_error/run.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -CDC_COUNT=3 -DB_COUNT=4 -MAX_RETRIES=20 - -function run() { - # kafka is not supported yet. - if [ "$SINK_TYPE" != "mysql" ]; then - return - fi - - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - start_tidb_cluster --workdir $WORK_DIR - cd $WORK_DIR - - pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" - SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" - - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/OwnerRemoveTableError=1*return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr - changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - - run_sql "CREATE DATABASE owner_remove_table_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table owner_remove_table_error.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "INSERT INTO owner_remove_table_error.t1 VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "DROP table owner_remove_table_error.t1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table owner_remove_table_error.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "INSERT INTO owner_remove_table_error.t2 VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "CREATE table owner_remove_table_error.finished_mark(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - - check_table_exists "owner_remove_table_error.finished_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -run $* -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"