Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0458bc1
fix metrics
hongyunyan Dec 3, 2024
a440de6
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 4, 2024
ea3ebb1
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 6, 2024
46d13bb
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 6, 2024
636b897
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 9, 2024
fda8e1c
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 10, 2024
c4598d1
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 10, 2024
2659444
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 14, 2024
746e99b
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 16, 2024
75d115d
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 17, 2024
8a1ea39
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 18, 2024
a9b75c1
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 18, 2024
87c0da1
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 19, 2024
5f40996
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 19, 2024
f5ca672
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 20, 2024
20b04d2
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 20, 2024
83b8d50
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 20, 2024
8fbf471
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 21, 2024
cbbf0bb
Merge branch 'master' of https://github.com/flowbehappy/tigate
hongyunyan Dec 23, 2024
2cbb177
update
hongyunyan Dec 24, 2024
705cdcf
update
hongyunyan Dec 24, 2024
d1b5738
update
hongyunyan Dec 24, 2024
b613b36
update
hongyunyan Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,27 @@ jobs:
# name: upstream-switch-logs
# path: |
# ./logs.tar.gz

failover_e2e_test:
runs-on: ubuntu-latest
name: Failover E2E Test
steps:
- name: Check out code
uses: actions/checkout@v2

- name: Setup Go environment
uses: actions/setup-go@v3
with:
go-version: '1.23'

- name: Integration Build
run: |
tests/scripts/download-integration-test-binaries.sh master true
go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl
make integration_test_build
ls -l bin/ && ls -l tools/bin/

- name: Test failover
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over
8 changes: 8 additions & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package dispatcher

import (
"math/rand"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/syncpoint"
Expand Down Expand Up @@ -266,6 +268,12 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
block = false
// Dispatcher is ready, handle the events
for _, dispatcherEvent := range dispatcherEvents {
failpoint.Inject("HandleEventsSlowly", func() {
lag := time.Duration(rand.Intn(5000)) * time.Millisecond
log.Warn("handle events slowly", zap.Duration("lag", lag))
time.Sleep(lag)
})

event := dispatcherEvent.Event
// Pre-check, make sure the event is not stale
if event.GetCommitTs() < atomic.LoadUint64(&d.resolvedTs) {
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/fail_over/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/fail_over/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["fail_over_test.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
44 changes: 44 additions & 0 deletions tests/integration_tests/fail_over/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
drop database if exists `fail_over_test`;
create database `fail_over_test`;

use `fail_over_test`;

create table t1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

create table t2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

INSERT INTO t1 (val, col0) VALUES (1, 1);
INSERT INTO t1 (val, col0) VALUES (2, 2);
INSERT INTO t1 (val, col0) VALUES (3, 3);
INSERT INTO t1 (val, col0) VALUES (4, 4);
INSERT INTO t1 (val, col0) VALUES (5, 5);
INSERT INTO t1 (val, col0) VALUES (6, 6);
INSERT INTO t1 (val, col0) VALUES (7, 7);
INSERT INTO t1 (val, col0) VALUES (8, 8);
INSERT INTO t1 (val, col0) VALUES (9, 9);
INSERT INTO t1 (val, col0) VALUES (10, 10);

INSERT INTO t2 (val, col0) VALUES (1, 1);
INSERT INTO t2 (val, col0) VALUES (2, 2);
INSERT INTO t2 (val, col0) VALUES (3, 3);
INSERT INTO t2 (val, col0) VALUES (4, 4);
INSERT INTO t2 (val, col0) VALUES (5, 5);
INSERT INTO t2 (val, col0) VALUES (6, 6);
INSERT INTO t2 (val, col0) VALUES (7, 7);
INSERT INTO t2 (val, col0) VALUES (8, 8);
INSERT INTO t2 (val, col0) VALUES (9, 9);
INSERT INTO t2 (val, col0) VALUES (10, 10);

CREATE TABLE fail_over_test.finish_mark (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
116 changes: 116 additions & 0 deletions tests/integration_tests/fail_over/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/bin/bash
# This case is used to test the fail-over feature of TiCDC.
# The test just test the basic normal fail-over process
# (kill cdc node and start it again | two node, kill one and restart this one )

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

## One TiCDC, and kill/restart the cdc node
function failOverOnlyOneNode() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
cdc_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

TOPIC_NAME="ticdc-failover-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)'
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

sleep 1

kill_cdc_pid $cdc_pid
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

## Two TiCDC, failover one node and then failover the other node
function failOverWhenTwoNode() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

## server 1
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
cdc_pid_1=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
echo $cdc_pid_1
## server 2
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"

TOPIC_NAME="ticdc-failover-test-two-node-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatcher/HandleEventsSlowly=return(true)'
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

sleep 1

kill_cdc_pid $cdc_pid_1
cdc_pid_2=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
## restart server 1
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0-1" --addr "127.0.0.1:8300"
kill_cdc_pid $cdc_pid_2
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-1" --addr "127.0.0.1:8301"

check_table_exists fail_over_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
failOverOnlyOneNode $*
failOverWhenTwoNode $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"