From 0458bc15674d80d36381dd23d2d2c40c1452ef72 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 3 Dec 2024 19:39:16 +0800 Subject: [PATCH 1/5] fix metrics --- .../dispatchermanager/event_dispatcher_manager.go | 2 +- metrics/grafana/ticdc_new_arch.json | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 9bfc8d50db..ec7cd6f505 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -256,7 +256,7 @@ func (e *EventDispatcherManager) close(remove bool) { e.cancel() e.wg.Wait() - e.tableEventDispatcherCount.Sub(float64(e.dispatcherMap.Len())) + metrics.TableEventDispatcherGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) metrics.CreateDispatcherDuration.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) metrics.EventDispatcherManagerCheckpointTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) metrics.EventDispatcherManagerResolvedTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index ecfef4ccbd..ed290d46e9 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -1801,12 +1801,12 @@ "targets": [ { "exemplar": true, - "expr": "sum(tigate_dispatchermanager_table_event_dispatcher_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) by (instance)", + "expr": "sum(tigate_dispatchermanager_table_event_dispatcher_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) by (instance, changefeed)", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{instance}}", + "legendFormat": "{{instance}}-{{changefeed}}", "refId": "A", "step": 10 } From 2cbb1773e7b65374a9a30109e9b8518dc31f4e91 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 24 Dec 2024 11:20:11 +0800 Subject: [PATCH 2/5] update --- .github/workflows/integration_test_mysql.yaml | 24 ++++ downstreamadapter/dispatcher/dispatcher.go | 8 ++ .../fail_over/conf/diff_config.toml | 28 +++++ .../fail_over/data/prepare.sql | 44 +++++++ tests/integration_tests/fail_over/run.sh | 116 ++++++++++++++++++ 5 files changed, 220 insertions(+) create mode 100644 tests/integration_tests/fail_over/conf/diff_config.toml create mode 100644 tests/integration_tests/fail_over/data/prepare.sql create mode 100644 tests/integration_tests/fail_over/run.sh diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 8986ea3905..4d9426b23c 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -92,3 +92,27 @@ jobs: # name: upstream-switch-logs # path: | # ./logs.tar.gz + +failover_e2e_test: + runs-on: ubuntu-latest + name: E2E Test + steps: + - name: Check out code + uses: actions/checkout@v2 + + - name: Setup Go environment + uses: actions/setup-go@v3 + with: + go-version: '1.23' + + - name: Integration Build + run: | + tests/scripts/download-integration-test-binaries.sh master true + go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl + make integration_test_build + ls -l bin/ && ls -l tools/bin/ + + - name: Test failover + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=failover diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index df76423f2d..74ec607839 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -14,9 +14,11 @@ package dispatcher import ( + "math/rand" "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/downstreamadapter/syncpoint" @@ -266,6 +268,12 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba block = false // Dispatcher is ready, handle the events for _, dispatcherEvent := range dispatcherEvents { + failpoint.Inject("HandleEventsSlowly", func() { + lag := time.Duration(rand.Intn(5000)) * time.Millisecond + log.Warn("handle events slowly", zap.Duration("lag", lag)) + time.Sleep(lag) + }) + event := dispatcherEvent.Event // Pre-check, make sure the event is not stale if event.GetCommitTs() < atomic.LoadUint64(&d.resolvedTs) { diff --git a/tests/integration_tests/fail_over/conf/diff_config.toml b/tests/integration_tests/fail_over/conf/diff_config.toml new file mode 100644 index 0000000000..1c4a24caf9 --- /dev/null +++ b/tests/integration_tests/fail_over/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/fail_over/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["fail_over_test.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/fail_over/data/prepare.sql b/tests/integration_tests/fail_over/data/prepare.sql new file mode 100644 index 0000000000..53fad4bfbb --- /dev/null +++ b/tests/integration_tests/fail_over/data/prepare.sql @@ -0,0 +1,44 @@ +drop database if exists `fail_over_test`; +create database `fail_over_test`; + +use `fail_over_test`; + +create table t1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +create table t2 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +INSERT INTO t1 (val, col0) VALUES (1, 1); +INSERT INTO t1 (val, col0) VALUES (2, 2); +INSERT INTO t1 (val, col0) VALUES (3, 3); +INSERT INTO t1 (val, col0) VALUES (4, 4); +INSERT INTO t1 (val, col0) VALUES (5, 5); +INSERT INTO t1 (val, col0) VALUES (6, 6); +INSERT INTO t1 (val, col0) VALUES (7, 7); +INSERT INTO t1 (val, col0) VALUES (8, 8); +INSERT INTO t1 (val, col0) VALUES (9, 9); +INSERT INTO t1 (val, col0) VALUES (10, 10); + +INSERT INTO t2 (val, col0) VALUES (1, 1); +INSERT INTO t2 (val, col0) VALUES (2, 2); +INSERT INTO t2 (val, col0) VALUES (3, 3); +INSERT INTO t2 (val, col0) VALUES (4, 4); +INSERT INTO t2 (val, col0) VALUES (5, 5); +INSERT INTO t2 (val, col0) VALUES (6, 6); +INSERT INTO t2 (val, col0) VALUES (7, 7); +INSERT INTO t2 (val, col0) VALUES (8, 8); +INSERT INTO t2 (val, col0) VALUES (9, 9); +INSERT INTO t2 (val, col0) VALUES (10, 10); + +CREATE TABLE fail_over_test.finish_mark ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); diff --git a/tests/integration_tests/fail_over/run.sh b/tests/integration_tests/fail_over/run.sh new file mode 100644 index 0000000000..342b0ac7a2 --- /dev/null +++ b/tests/integration_tests/fail_over/run.sh @@ -0,0 +1,116 @@ +#!/bin/bash +# This case is used to test the fail-over feature of TiCDC. +# The test just test the basic normal fail-over process +# (kill cdc node and start it again | two node, kill one and restart this one ) + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +## One TiCDC, and kill/restart the cdc node +function failOverOnlyOneNode() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + cdc_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + TOPIC_NAME="ticdc-failover-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)' + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + sleep 1 + + kill_cdc_pid $cdc_pid + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +## Two TiCDC, failover one node and then failover the other node +function failOverWhenTwoNode() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + ## server 1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300" + cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + echo $cdc_pid_1 + ## server 2 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301" + + TOPIC_NAME="ticdc-failover-test-two-node-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)' + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + sleep 1 + + kill_cdc_pid $cdc_pid_1 + cdc_pid_2=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + ## restart server 1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300" + kill_cdc_pid $cdc_pid_2 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-1" --addr "127.0.0.1:8301" + + check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +failOverOnlyOneNode $* +failOverWhenTwoNode $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 705cdcf4d698c57d793320851d7520e0bb054953 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 24 Dec 2024 11:24:47 +0800 Subject: [PATCH 3/5] update --- .github/workflows/integration_test_mysql.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 4d9426b23c..42bf115aa1 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -93,7 +93,7 @@ jobs: # path: | # ./logs.tar.gz -failover_e2e_test: + failover_e2e_test: runs-on: ubuntu-latest name: E2E Test steps: From d1b5738ea381d32522c6671e53a1d08e42108cdf Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 24 Dec 2024 11:25:41 +0800 Subject: [PATCH 4/5] update --- .github/workflows/integration_test_mysql.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 42bf115aa1..61653466f7 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -95,7 +95,7 @@ jobs: failover_e2e_test: runs-on: ubuntu-latest - name: E2E Test + name: Failover E2E Test steps: - name: Check out code uses: actions/checkout@v2 From b613b3673dc6b424f7311559af354f49329fd45c Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 24 Dec 2024 11:56:41 +0800 Subject: [PATCH 5/5] update --- .github/workflows/integration_test_mysql.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 61653466f7..adca27e2b0 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -115,4 +115,4 @@ jobs: - name: Test failover run: | pwd && ls -l bin/ && ls -l tools/bin/ - export TICDC_NEWARCH=true && make integration_test CASE=failover + export TICDC_NEWARCH=true && make integration_test CASE=fail_over