diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 8986ea3905..adca27e2b0 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -92,3 +92,27 @@ jobs: # name: upstream-switch-logs # path: | # ./logs.tar.gz + + failover_e2e_test: + runs-on: ubuntu-latest + name: Failover E2E Test + steps: + - name: Check out code + uses: actions/checkout@v2 + + - name: Setup Go environment + uses: actions/setup-go@v3 + with: + go-version: '1.23' + + - name: Integration Build + run: | + tests/scripts/download-integration-test-binaries.sh master true + go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl + make integration_test_build + ls -l bin/ && ls -l tools/bin/ + + - name: Test failover + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index df76423f2d..74ec607839 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -14,9 +14,11 @@ package dispatcher import ( + "math/rand" "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/downstreamadapter/syncpoint" @@ -266,6 +268,12 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba block = false // Dispatcher is ready, handle the events for _, dispatcherEvent := range dispatcherEvents { + failpoint.Inject("HandleEventsSlowly", func() { + lag := time.Duration(rand.Intn(5000)) * time.Millisecond + log.Warn("handle events slowly", zap.Duration("lag", lag)) + time.Sleep(lag) + }) + event := dispatcherEvent.Event // Pre-check, make sure the event is not stale if event.GetCommitTs() < atomic.LoadUint64(&d.resolvedTs) { diff --git a/tests/integration_tests/fail_over/conf/diff_config.toml b/tests/integration_tests/fail_over/conf/diff_config.toml new file mode 100644 index 0000000000..1c4a24caf9 --- /dev/null +++ b/tests/integration_tests/fail_over/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/fail_over/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["fail_over_test.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/fail_over/data/prepare.sql b/tests/integration_tests/fail_over/data/prepare.sql new file mode 100644 index 0000000000..53fad4bfbb --- /dev/null +++ b/tests/integration_tests/fail_over/data/prepare.sql @@ -0,0 +1,44 @@ +drop database if exists `fail_over_test`; +create database `fail_over_test`; + +use `fail_over_test`; + +create table t1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +create table t2 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +INSERT INTO t1 (val, col0) VALUES (1, 1); +INSERT INTO t1 (val, col0) VALUES (2, 2); +INSERT INTO t1 (val, col0) VALUES (3, 3); +INSERT INTO t1 (val, col0) VALUES (4, 4); +INSERT INTO t1 (val, col0) VALUES (5, 5); +INSERT INTO t1 (val, col0) VALUES (6, 6); +INSERT INTO t1 (val, col0) VALUES (7, 7); +INSERT INTO t1 (val, col0) VALUES (8, 8); +INSERT INTO t1 (val, col0) VALUES (9, 9); +INSERT INTO t1 (val, col0) VALUES (10, 10); + +INSERT INTO t2 (val, col0) VALUES (1, 1); +INSERT INTO t2 (val, col0) VALUES (2, 2); +INSERT INTO t2 (val, col0) VALUES (3, 3); +INSERT INTO t2 (val, col0) VALUES (4, 4); +INSERT INTO t2 (val, col0) VALUES (5, 5); +INSERT INTO t2 (val, col0) VALUES (6, 6); +INSERT INTO t2 (val, col0) VALUES (7, 7); +INSERT INTO t2 (val, col0) VALUES (8, 8); +INSERT INTO t2 (val, col0) VALUES (9, 9); +INSERT INTO t2 (val, col0) VALUES (10, 10); + +CREATE TABLE fail_over_test.finish_mark ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); diff --git a/tests/integration_tests/fail_over/run.sh b/tests/integration_tests/fail_over/run.sh new file mode 100644 index 0000000000..342b0ac7a2 --- /dev/null +++ b/tests/integration_tests/fail_over/run.sh @@ -0,0 +1,116 @@ +#!/bin/bash +# This case is used to test the fail-over feature of TiCDC. +# The test just test the basic normal fail-over process +# (kill cdc node and start it again | two node, kill one and restart this one ) + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +## One TiCDC, and kill/restart the cdc node +function failOverOnlyOneNode() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + cdc_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + TOPIC_NAME="ticdc-failover-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)' + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + sleep 1 + + kill_cdc_pid $cdc_pid + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +## Two TiCDC, failover one node and then failover the other node +function failOverWhenTwoNode() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + ## server 1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300" + cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + echo $cdc_pid_1 + ## server 2 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301" + + TOPIC_NAME="ticdc-failover-test-two-node-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)' + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + sleep 1 + + kill_cdc_pid $cdc_pid_1 + cdc_pid_2=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + ## restart server 1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300" + kill_cdc_pid $cdc_pid_2 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-1" --addr "127.0.0.1:8301" + + check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +failOverOnlyOneNode $* +failOverWhenTwoNode $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"