From 738f74a541199e5b064e70f656c87c5f98600810 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 17:13:45 +0800 Subject: [PATCH 1/7] schemaStore: fix recover table panic Signed-off-by: dongmen <414110582@qq.com> --- logservice/schemastore/multi_version.go | 4 ++- logservice/schemastore/persist_storage.go | 2 +- .../persist_storage_ddl_handlers.go | 6 ++-- logservice/schemastore/schema_store.go | 16 +++++----- tests/integration_tests/common_1/run.sh | 19 +----------- tests/integration_tests/ddl_reentrant/run.sh | 29 ++++--------------- 6 files changed, 23 insertions(+), 53 deletions(-) diff --git a/logservice/schemastore/multi_version.go b/logservice/schemastore/multi_version.go index 80a4a47da9..64082b839f 100644 --- a/logservice/schemastore/multi_version.go +++ b/logservice/schemastore/multi_version.go @@ -196,7 +196,9 @@ func (v *versionedTableInfoStore) applyDDL(event *PersistedDDLEvent) { v.mu.Lock() defer v.mu.Unlock() // delete table should not receive more ddl - assertNonDeleted(v) + if event.Type != byte(model.ActionRecoverTable) { + assertNonDeleted(v) + } if !v.initialized { // The usage of the parameter `event` may outlive the function call, so we copy it. diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index da8499a8d0..3755df792a 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -689,7 +689,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { } // Note: need write ddl event to disk before update ddl history, - // becuase other goroutines may read ddl events from disk according to ddl history + // because other goroutines may read ddl events from disk according to ddl history writePersistedDDLEvent(p.db, &ddlEvent) p.mu.Lock() diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 205a12d148..f2d466e6dc 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -88,12 +88,14 @@ type persistStorageDDLHandler struct { updateDDLHistoryFunc func(args updateDDLHistoryFuncArgs) []uint64 // updateSchemaMetadataFunc update database info, table info and partition info according to the ddl event updateSchemaMetadataFunc func(args updateSchemaMetadataFuncArgs) - // iteratehEventTablesFunc call `apply` for all tables related to the ddl event + // iterateEventTablesFunc iterates through all physical table IDs affected by the DDL event + // and calls the provided `apply` function with those IDs. For partition tables, it includes + // all partition IDs. iterateEventTablesFunc func(event *PersistedDDLEvent, apply func(tableIDs ...int64)) // extractTableInfoFunc extract (table info, deleted) for the specified `tableID` from ddl event extractTableInfoFunc func(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) // buildDDLEvent build a DDLEvent from a PersistedDDLEvent - buildDDLEventFunc func(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent + buildDDLEventFunc func(event *PersistedDDLEvent, tableFilter filter.Filter) commonEvent.DDLEvent } var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index ef06134e6d..81e2da5bd5 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -61,10 +61,10 @@ type schemaStore struct { notifyCh chan interface{} - // resolved ts pending for apply + // pendingResolvedTs is the largest resolvedTs the pending ddl events pendingResolvedTs atomic.Uint64 - - // max resolvedTs of all applied ddl events + // resolvedTs is the largest resolvedTs of all applied ddl events + // Invariant: resolvedTs >= pendingResolvedTs resolvedTs atomic.Uint64 // the following two fields are used to filter out duplicate ddl events @@ -107,7 +107,7 @@ func New( kvStorage, upperBound.ResolvedTs, s.writeDDLEvent, - s.advanceResolvedTs) + s.advancePendingResolvedTs) return s } @@ -138,9 +138,6 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { resolvedPhyTs := oracle.ExtractPhysical(pendingTs) resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3 metrics.SchemaStoreResolvedTsLagGauge.Set(float64(resolvedLag)) - // log.Info("advance resolved ts", - // zap.Uint64("resolveTs", pendingTs), - // zap.Float64("lag(s)", resolvedLag)) }() if pendingTs <= s.resolvedTs.Load() { @@ -308,7 +305,9 @@ func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) { } } -func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) { +// advancePendingResolvedTs will be call by ddlJobFetcher when it fetched a new ddl event +// it will update the pendingResolvedTs and notify the updateResolvedTs goroutine to apply the ddl event +func (s *schemaStore) advancePendingResolvedTs(resolvedTs uint64) { for { currentTs := s.pendingResolvedTs.Load() if resolvedTs <= currentTs { @@ -325,6 +324,7 @@ func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) { } // TODO: use notify instead of sleep +// waitResolvedTs will wait until the schemaStore resolved ts is greater than or equal to ts. func (s *schemaStore) waitResolvedTs(tableID int64, ts uint64, logInterval time.Duration) { start := time.Now() lastLogTime := time.Now() diff --git a/tests/integration_tests/common_1/run.sh b/tests/integration_tests/common_1/run.sh index 72ac63f7cd..2a9f0a21f4 100644 --- a/tests/integration_tests/common_1/run.sh +++ b/tests/integration_tests/common_1/run.sh @@ -25,18 +25,6 @@ function run() { cd $WORK_DIR - tidb_build_branch=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e \ - "select tidb_version()\G" | grep "Git Branch" | awk -F: '{print $(NF)}' | tr -d " ") - # TODO: refine the release detection after 5.0 tag of TiDB is ready - if [[ $tidb_build_branch =~ ^(master)$ ]]; then - # https://github.com/pingcap/tidb/pull/21533 disables multi_schema change - # feature by default, turn it on first - run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # This must be set before cdc server starts - run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - # TiDB global variables cache 2 seconds at most - sleep 2 - fi # record tso before we create tables to skip the system table DDLs start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) @@ -75,12 +63,7 @@ EOF esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # TODO: refine the release detection after 5.0 tag of TiDB is ready - if [[ ! $tidb_build_branch =~ ^(master)$ ]]; then - echo "skip some SQLs in tidb v4.0.x" - else - run_sql_file $CUR/data/test_v5.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - fi + run_sql_file $CUR/data/test_v5.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/test_finish.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 50dc3198ae..22adb26af8 100644 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -16,17 +16,12 @@ ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`' ) function complete_ddls() { - # TODO: refine the release detection after 5.0 tag of TiDB is ready - if [[ ! $tidb_build_branch =~ master ]]; then - echo "skip some DDLs in tidb v4.0.x" - else - echo "complete all DDLs in master" - # DDLs that are supportted since 5.0 - # ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT') - # ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`') - # ddls+=("rename table ddl_reentrant.t2 to ddl_reentrant.tt2, ddl_reentrant.t3 to ddl_reentrant.tt3" false 'RENAME TABLE `ddl_reentrant`.`t2` TO `ddl_reentrant`.`tt2`, `ddl_reentrant`.`t3` TO `ddl_reentrant`.`tt3`') - # ddls+=("rename table ddl_reentrant.tt2 to ddl_reentrant.t2, ddl_reentrant.tt3 to ddl_reentrant.t3" false 'RENAME TABLE `ddl_reentrant`.`tt2` TO `ddl_reentrant`.`t2`, `ddl_reentrant`.`tt3` TO `ddl_reentrant`.`t3`') - fi + # DDLs that are supportted since 5.0 + # ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT') + # ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`') + # ddls+=("rename table ddl_reentrant.t2 to ddl_reentrant.tt2, ddl_reentrant.t3 to ddl_reentrant.tt3" false 'RENAME TABLE `ddl_reentrant`.`t2` TO `ddl_reentrant`.`tt2`, `ddl_reentrant`.`t3` TO `ddl_reentrant`.`tt3`') + # ddls+=("rename table ddl_reentrant.tt2 to ddl_reentrant.t2, ddl_reentrant.tt3 to ddl_reentrant.t3" false 'RENAME TABLE `ddl_reentrant`.`tt2` TO `ddl_reentrant`.`t2`, `ddl_reentrant`.`tt3` TO `ddl_reentrant`.`t3`') + # ddls+=("alter table ddl_reentrant.t2 drop primary key" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP PRIMARY KEY') # ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD PRIMARY KEY `pk`(`id`)') # ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`') @@ -111,18 +106,6 @@ function run() { start_tidb_cluster --workdir $WORK_DIR --tidb-config $CUR/conf/tidb_config.toml complete_ddls - # TODO: refine the release detection after 5.0 tag of TiDB is ready - if [[ $tidb_build_branch =~ master ]]; then - # https://github.com/pingcap/tidb/pull/21533 disables multi_schema change - # feature by default, turn it on first - run_sql "set @@global.tidb_enable_exchange_partition=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # This must be set before cdc server starts - run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - # TiDB global variables cache 2 seconds at most - sleep 2 - fi cd $WORK_DIR From c54d7d8c14b5f366333d7c2340876e04a365b97c Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 18:08:28 +0800 Subject: [PATCH 2/7] remove useless cases and enable default value case Signed-off-by: dongmen <414110582@qq.com> --- .../clustered_index/conf/diff_config.toml | 29 --- .../clustered_index/data/test.sql | 182 ------------------ .../integration_tests/clustered_index/run.sh | 64 ------ .../ddl_manager/conf/diff_config.toml | 28 --- .../ddl_manager/data/prepare.sql | 127 ------------ tests/integration_tests/ddl_manager/run.sh | 60 ------ .../conf/diff_config.toml | 28 --- .../conf/diff_config_2.toml | 28 --- .../data/finishe.sql | 8 - .../data/start.sql | 13 -- .../ddl_only_block_related_table/run.sh | 118 ------------ tests/integration_tests/ddl_puller_lag/run.sh | 112 ----------- tests/integration_tests/default_value/run.sh | 2 - 13 files changed, 799 deletions(-) delete mode 100644 tests/integration_tests/clustered_index/conf/diff_config.toml delete mode 100644 tests/integration_tests/clustered_index/data/test.sql delete mode 100755 tests/integration_tests/clustered_index/run.sh delete mode 100644 tests/integration_tests/ddl_manager/conf/diff_config.toml delete mode 100644 tests/integration_tests/ddl_manager/data/prepare.sql delete mode 100755 tests/integration_tests/ddl_manager/run.sh delete mode 100644 tests/integration_tests/ddl_only_block_related_table/conf/diff_config.toml delete mode 100644 tests/integration_tests/ddl_only_block_related_table/conf/diff_config_2.toml delete mode 100644 tests/integration_tests/ddl_only_block_related_table/data/finishe.sql delete mode 100644 tests/integration_tests/ddl_only_block_related_table/data/start.sql delete mode 100755 tests/integration_tests/ddl_only_block_related_table/run.sh delete mode 100644 tests/integration_tests/ddl_puller_lag/run.sh diff --git a/tests/integration_tests/clustered_index/conf/diff_config.toml b/tests/integration_tests/clustered_index/conf/diff_config.toml deleted file mode 100644 index cc84ddc360..0000000000 --- a/tests/integration_tests/clustered_index/conf/diff_config.toml +++ /dev/null @@ -1,29 +0,0 @@ -# diff Configuration. - -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] - output-dir = "/tmp/tidb_cdc_test/clustered_index/sync_diff/output" - - source-instances = ["mysql1"] - - target-instance = "tidb0" - - target-check-tables = ["clustered_index_test.?*"] - -[data-sources] -[data-sources.mysql1] - host = "127.0.0.1" - port = 4000 - user = "root" - password = "" - -[data-sources.tidb0] - host = "127.0.0.1" - port = 3306 - user = "root" - password = "" diff --git a/tests/integration_tests/clustered_index/data/test.sql b/tests/integration_tests/clustered_index/data/test.sql deleted file mode 100644 index 02d913b33e..0000000000 --- a/tests/integration_tests/clustered_index/data/test.sql +++ /dev/null @@ -1,182 +0,0 @@ -drop database if exists `clustered_index_test`; -create database `clustered_index_test`; -use `clustered_index_test`; - -CREATE TABLE t0 ( - id VARCHAR(255), - data INT, - PRIMARY KEY(id) -); - -INSERT INTO t0 VALUES ('1', 1); -INSERT INTO t0 VALUES ('2', 2); -INSERT INTO t0 VALUES ('3', 3); -INSERT INTO t0 VALUES ('4', 4); -INSERT INTO t0 VALUES ('5', 5); - -DELETE FROM t0 WHERE id = '3'; -DELETE FROM t0 WHERE data = 5; -UPDATE t0 SET id = '10' WHERE data = 1; -UPDATE t0 SET data = 555 WHERE id = '10'; - -CREATE TABLE t1 ( - id VARCHAR(255), - a INT, - b CHAR(10), - PRIMARY KEY(id, b), - UNIQUE KEY(b), - KEY(a) -); - -INSERT INTO t1 VALUES ('111', 111, '111'); -INSERT INTO t1 VALUES ('222', 222, '222'); -INSERT INTO t1 VALUES ('333', 333, '333'); -INSERT INTO t1 VALUES ('444', 444, '444'); -INSERT INTO t1 VALUES ('555', 555, '555'); -UPDATE t1 SET id = '10' WHERE id = '111'; -DELETE FROM t1 WHERE a = 222; - -CREATE TABLE t2 ( - id VARCHAR(255), - a INT, - b DECIMAL(5,2), - PRIMARY KEY(id, a), - KEY(id, a), - UNIQUE KEY(id, a) -); - -INSERT INTO t2 VALUES ('aaaa', 1111, 11.0); -INSERT INTO t2 VALUES ('bbbb', 1111, 12.0); -INSERT INTO t2 VALUES ('cccc', 1111, 13.0); -INSERT INTO t2 VALUES ('dddd', 1111, 14.0); -INSERT INTO t2 VALUES ('eeee', 1111, 15.0); - - -create table t_bit(a bit primary key, b int); -INSERT INTO t_bit VALUES(1,2); -INSERT INTO t_bit VALUES(0,3); - -create table t_bool(a bool primary key, b int); -INSERT INTO t_bool VALUES(true,2); -INSERT INTO t_bool VALUES(false,3); - -create table t_tinyint(a tinyint primary key, b int); -INSERT INTO t_tinyint VALUES(6,2); -INSERT INTO t_tinyint VALUES(8,3); - -create table t_smallint(a smallint primary key, b int); -INSERT INTO t_smallint VALUES(432,2); -INSERT INTO t_smallint VALUES(125,3); - -create table t_mediumint(a mediumint primary key, b int); -INSERT INTO t_mediumint VALUES(8567,2); -INSERT INTO t_mediumint VALUES(12341,3); - -create table t_int(a int primary key, b int); -INSERT INTO t_int VALUES(123563,2); -INSERT INTO t_int VALUES(6784356,3); - -create table t_date(a date primary key, b int); -INSERT INTO t_date VALUES ('2020-02-20', 1); -INSERT INTO t_date VALUES ('2020-02-21', 2); -INSERT INTO t_date VALUES ('2020-02-22', 3); -UPDATE t_date SET a = '2020-02-23' WHERE b = 3; -DELETE FROM t_date WHERE b = 2; - - -create table t_time(a time primary key, b int); - -INSERT INTO t_time VALUES ('11:22:33', 1); -INSERT INTO t_time VALUES ('11:33:22', 2); -INSERT INTO t_time VALUES ('11:43:11', 3); -UPDATE t_time SET a = '11:44:55' WHERE b = 3; -DELETE FROM t_time WHERE b = 2; - -create table t_datetime(a datetime primary key, b int); -INSERT INTO t_datetime VALUES ('2020-02-20 11:22:33', 1); -INSERT INTO t_datetime VALUES ('2020-02-21 11:33:22', 2); -INSERT INTO t_datetime VALUES ('2020-02-22 11:43:11', 3); -UPDATE t_datetime SET a = '2020-02-23 11:44:55' WHERE b = 3; -DELETE FROM t_datetime WHERE b = 2; - -create table t_timestamp(a timestamp primary key, b int); -INSERT INTO t_timestamp VALUES ('2020-02-20 11:22:33', 1); -INSERT INTO t_timestamp VALUES ('2020-02-21 11:33:22', 2); -INSERT INTO t_timestamp VALUES ('2020-02-22 11:43:11', 3); -UPDATE t_timestamp SET a = '2020-02-23 11:44:55' WHERE b = 3; -DELETE FROM t_timestamp WHERE b = 2; - -create table t_year(a year primary key, b int); -INSERT INTO t_year VALUES ('2020', 1); -INSERT INTO t_year VALUES ('2021', 2); -INSERT INTO t_year VALUES ('2022', 3); -UPDATE t_year SET a = '2023' WHERE b = 3; -DELETE FROM t_year WHERE b = 2; - - -create table t_char(a char(20) primary key, b int); -INSERT INTO t_char VALUES ('abcc', 1); -INSERT INTO t_char VALUES ('sdff', 2); -UPDATE t_char SET a = 'ppooii' WHERE b = 2; -DELETE FROM t_char WHERE b = 1; - -create table t_varcher(a varchar(255) primary key, b int); -INSERT INTO t_varcher VALUES ('abcc', 1); -INSERT INTO t_varcher VALUES ('sdff', 2); -UPDATE t_varcher SET a = 'ppooii' WHERE b = 2; -DELETE FROM t_varcher WHERE b = 1; - -create table t_text (a text, b int, primary key(a(5))); -INSERT INTO t_text VALUES ('abcc', 1); -INSERT INTO t_text VALUES ('sdff', 2); -UPDATE t_text SET a = 'ppooii' WHERE b = 2; -DELETE FROM t_text WHERE b = 1; - -create table t_binary(a binary(20) primary key, b int); -INSERT INTO t_binary VALUES (x'89504E470D0A1A0A',1),(x'89504E470D0A1A0B',2),(x'89504E470D0A1A0C',3); -update t_binary set a = x'89504E470D0A1A0D' where b = 3; -delete from t_binary where b = 2; - -create table t_blob(a blob, b int, primary key (a(20))); -INSERT INTO t_blob VALUES (x'89504E470D0A1A0A',1),(x'89504E470D0A1A0B',2),(x'89504E470D0A1A0C',3); -update t_blob set a = x'89504E470D0A1A0D' where b = 3; -delete from t_binary where b = 2; - -create table t_enum(e enum('a', 'b', 'c') primary key, b int); -INSERT INTO t_enum VALUES ('a',1),('b',2),('c',3); -delete from t_enum where b = 2; -update t_enum set e = 'b' where b = 3; - -create table t_set(s set('a', 'b', 'c') primary key, b int); -INSERT INTO t_set VALUES ('a',1),('b,c',2),('a,c',3); -delete from t_set where b = 2; -update t_set set s = 'b' where b = 3; - - -create table t8(a int, b varchar(255) as (concat(a, "test")) stored, primary key(b)); -INSERT INTO t8(a) VALUES (2020); -INSERT INTO t8(a) VALUES (2021); -INSERT INTO t8(a) VALUES (2022); -UPDATE t8 SET a = 2023 WHERE a = 2022; -DELETE FROM t8 WHERE a = 2021; - - -create table t9(a int, b varchar(255), c int, primary key(a ,b)); -insert into t9 values(1, "aaa", 1),(2, "bbb", 2),(3, "ccc", 3); -update t9 set b='ddd' where c = 3; -delete from t9 where c = 2; - -create table t10(a int, b int, c int, primary key(a, b)); -insert into t10 values(1, 1, 1),(2, 2, 2),(3, 3, 3); -update t10 set b = 4 where a = 3; -delete from t10 where a = 2; - -create table t11(a int, b float, c int, primary key(a,b)); -insert into t11 values(1, 1.1, 1),(2, 2.2, 2),(3, 3.3, 3); -update t11 set b = 4.4 where c = 3; -delete from t11 where b = 2; - -create table t12(name char(255) primary key, b int, c int, index idx(name), unique index uidx(name)); -insert into t12 values("aaaa", 1, 1), ("bbb", 2, 2), ("ccc", 3, 3); -update t12 set name = 'ddd' where c = 3; -delete from t12 where c = 2; diff --git a/tests/integration_tests/clustered_index/run.sh b/tests/integration_tests/clustered_index/run.sh deleted file mode 100755 index 818d5af9bc..0000000000 --- a/tests/integration_tests/clustered_index/run.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -function run() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - # record tso before we create tables to skip the system table DDLs - start_ts=$(run_cdc_cli_tso_query $UP_PD_HOST_1 $UP_PD_PORT_1) - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - TOPIC_NAME="ticdc-clustered-index-test-$RANDOM" - case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - pulsar) - run_pulsar_cluster $WORK_DIR normal - SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" - ;; - *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; - esac - cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; - esac - run_sql "set global tidb_enable_clustered_index=1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # TiDB global variables cache 2 seconds at most - sleep 2 - run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # sync_diff can't check non-exist table, so we check expected tables are created in downstream first - - check_table_exists clustered_index_test.t0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists clustered_index_test.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists clustered_index_test.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - echo "check table exists success" - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60 - - cleanup_process $CDC_BINARY -} - -# kafka is not supported yet. -# ref to issue: https://github.com/pingcap/tiflow/issues/3421 -# TODO: enable this test for kafka, storage and pulsar sink. -if [ "$SINK_TYPE" != "mysql" ]; then - echo "[$(date)] <<<<<< skip test case $TEST_NAME for $SINK_TYPE! >>>>>>" - exit 0 -fi -trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_manager/conf/diff_config.toml b/tests/integration_tests/ddl_manager/conf/diff_config.toml deleted file mode 100644 index 3956e3adfa..0000000000 --- a/tests/integration_tests/ddl_manager/conf/diff_config.toml +++ /dev/null @@ -1,28 +0,0 @@ -# diff Configuration. -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] -output-dir = "/tmp/tidb_cdc_test/ddl_manager/sync_diff/output" - -source-instances = ["mysql1"] - -target-instance = "tidb0" - -target-check-tables = ["ddl_manager.*", "cross_db_1.*", "cross_db_2.*"] - -[data-sources] -[data-sources.mysql1] -host = "127.0.0.1" -port = 4000 -user = "root" -password = "" - -[data-sources.tidb0] -host = "127.0.0.1" -port = 3306 -user = "root" -password = "" diff --git a/tests/integration_tests/ddl_manager/data/prepare.sql b/tests/integration_tests/ddl_manager/data/prepare.sql deleted file mode 100644 index 444106d9df..0000000000 --- a/tests/integration_tests/ddl_manager/data/prepare.sql +++ /dev/null @@ -1,127 +0,0 @@ -drop database if exists `ddl_manager`; -create database `ddl_manager`; - -drop database if exists `ddl_manager_test1`; -create database `ddl_manager_test1`; - -drop database if exists `ddl_manager_test2`; -create database `ddl_manager_test2`; - -drop database if exists `ddl_manager_test3`; -create database `ddl_manager_test3`; - -use `ddl_manager`; - -create table t1 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); -INSERT INTO t1 (val, col0) VALUES (1, 1); -INSERT INTO t1 (val, col0) VALUES (2, 2); -INSERT INTO t1 (val, col0) VALUES (3, 3); -INSERT INTO t1 (val, col0) VALUES (4, 4); -INSERT INTO t1 (val, col0) VALUES (5, 5); - - -CREATE TABLE t2 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); -INSERT INTO t2 (val, col0) VALUES (1, 1); -INSERT INTO t2 (val, col0) VALUES (2, 2); -INSERT INTO t2 (val, col0) VALUES (3, 3); -INSERT INTO t2 (val, col0) VALUES (4, 4); -INSERT INTO t2 (val, col0) VALUES (5, 5); - -drop table t2; - -create table t3 ( - a int, primary key (a) -) partition by hash(a) partitions 5; -insert into t3 values (1),(2),(3),(4),(5),(6); -insert into t3 values (7),(8),(9); -alter table t3 truncate partition p3; - -create table t4 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); -insert into t4 values (1),(2),(3),(4),(5),(6); -insert into t4 values (7),(8),(9); -insert into t4 values (11),(12),(20); -alter table t4 add partition (partition p3 values less than (30), partition p4 values less than (40)); - -CREATE TABLE t5 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -CREATE TABLE t6 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -DROP TABLE t5; - -drop database if exists `ddl_manager_test2`; - -CREATE TABLE t7 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -DROP TABLE t6; - -CREATE TABLE t8 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -DROP TABLE t7; - -CREATE TABLE t9 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -DROP TABLE t8; - -CREATE TABLE t10 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -DROP TABLE t9; - - -drop database if exists `cross_db_1`; -create database `cross_db_1`; - -drop database if exists `cross_db_2`; -create database `cross_db_2`; - -use `cross_db_1`; - -CREATE TABLE t1 ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - -RENAME TABLE `cross_db_1`.`t1` TO `cross_db_2`.`t1`; - -CREATE TABLE `cross_db_1`.`t2` like `cross_db_2`.`t1`; - - -CREATE TABLE ddl_manager.finish_mark ( - id INT AUTO_INCREMENT PRIMARY KEY, - val INT DEFAULT 0, - col0 INT NOT NULL -); - - diff --git a/tests/integration_tests/ddl_manager/run.sh b/tests/integration_tests/ddl_manager/run.sh deleted file mode 100755 index 609db1893c..0000000000 --- a/tests/integration_tests/ddl_manager/run.sh +++ /dev/null @@ -1,60 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -function run() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ExecuteDDLSlowly=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') - - # this test contains `recover table`, which requires super privilege, so we - # can't use the normal user - TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM" - case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - pulsar) - run_pulsar_cluster $WORK_DIR normal - SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" - ;; - *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; - esac - changefeed_id="ddl-manager" - run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} - - case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; - esac - - run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - - # kill owner to make sure ddl manager is working right when owner is down and up - kill_cdc_pid $owner_pid - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - check_table_exists ddl_manager.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 - # make sure all tables are equal in upstream and downstream - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_only_block_related_table/conf/diff_config.toml b/tests/integration_tests/ddl_only_block_related_table/conf/diff_config.toml deleted file mode 100644 index 9f35da33c8..0000000000 --- a/tests/integration_tests/ddl_only_block_related_table/conf/diff_config.toml +++ /dev/null @@ -1,28 +0,0 @@ -# diff Configuration. -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] -output-dir = "/tmp/tidb_cdc_test/ddl_only_block_related_table/sync_diff/output" - -source-instances = ["mysql1"] - -target-instance = "tidb0" - -target-check-tables = ["ddl_only_block_related_table.t*"] - -[data-sources] -[data-sources.mysql1] -host = "127.0.0.1" -port = 4000 -user = "root" -password = "" - -[data-sources.tidb0] -host = "127.0.0.1" -port = 3306 -user = "root" -password = "" diff --git a/tests/integration_tests/ddl_only_block_related_table/conf/diff_config_2.toml b/tests/integration_tests/ddl_only_block_related_table/conf/diff_config_2.toml deleted file mode 100644 index acd2895057..0000000000 --- a/tests/integration_tests/ddl_only_block_related_table/conf/diff_config_2.toml +++ /dev/null @@ -1,28 +0,0 @@ -# diff Configuration. -check-thread-count = 4 - -export-fix-sql = true - -check-struct-only = false - -[task] -output-dir = "/tmp/tidb_cdc_test/ddl_only_block_related_table/sync_diff/output" - -source-instances = ["mysql1"] - -target-instance = "tidb0" - -target-check-tables = ["ddl_only_block_related_table.*"] - -[data-sources] -[data-sources.mysql1] -host = "127.0.0.1" -port = 4000 -user = "root" -password = "" - -[data-sources.tidb0] -host = "127.0.0.1" -port = 3306 -user = "root" -password = "" diff --git a/tests/integration_tests/ddl_only_block_related_table/data/finishe.sql b/tests/integration_tests/ddl_only_block_related_table/data/finishe.sql deleted file mode 100644 index 729e4a9a8d..0000000000 --- a/tests/integration_tests/ddl_only_block_related_table/data/finishe.sql +++ /dev/null @@ -1,8 +0,0 @@ -use `ddl_only_block_related_table`; - --- this will block `ddl_not_done` table from being replicated -alter table ddl_not_done add column `good_id` bigint(20) unsigned null; - -insert into t1 values (2); -insert into t2 values (2); - diff --git a/tests/integration_tests/ddl_only_block_related_table/data/start.sql b/tests/integration_tests/ddl_only_block_related_table/data/start.sql deleted file mode 100644 index c5ec8b6a8c..0000000000 --- a/tests/integration_tests/ddl_only_block_related_table/data/start.sql +++ /dev/null @@ -1,13 +0,0 @@ -drop database if exists `ddl_only_block_related_table`; -create database `ddl_only_block_related_table`; -use `ddl_only_block_related_table`; - -create table t1 (id int primary key); -create table t2 (id int primary key); -create table ddl_not_done (id int primary key); - -insert into t1 values (1); -insert into t2 values (1); -insert into ddl_not_done values (1); - -create table finish_mark (id int primary key); diff --git a/tests/integration_tests/ddl_only_block_related_table/run.sh b/tests/integration_tests/ddl_only_block_related_table/run.sh deleted file mode 100755 index f104b540e4..0000000000 --- a/tests/integration_tests/ddl_only_block_related_table/run.sh +++ /dev/null @@ -1,118 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -function check_ts_not_forward() { - changefeed_id=$1 - ts1=$(cdc cli changefeed query -c "$changefeed_id" | jq -r '.checkpoint_tso') - sleep 1 - ts2=$(cdc cli changefeed query -c "$changefeed_id" | jq -r '.checkpoint_tso') - if [ "$ts1" == "$ts2" ]; then - count=0 - while [ "$ts1" == "$ts2" ]; do - sleep 1 - ts2=$(cdc cli changefeed query -c "$changefeed_id" | jq -r '.checkpoint_tso') - ((count++)) - if [ $count -ge 10 ]; then - echo "pass check, checkpoint tso not forward after 10s" - return - fi - done - fi - exit 1 -} - -function check_ts_forward() { - changefeedid=$1 - rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') - checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') - sleep 1 - rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') - checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') - if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then - if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then - echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}" - return - fi - fi - exit 1 -} - -export -f check_ts_not_forward -export -f check_ts_forward - -function run() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') - - # this test contains `recover table`, which requires super privilege, so we - # can't use the normal user - TOPIC_NAME="ticdc-common-1-test-$RANDOM" - case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - pulsar) - run_pulsar_cluster $WORK_DIR normal - SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" - ;; - *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; - esac - changefeed_id="ddl-only-block-related-table" - run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} - - case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; - esac - - run_sql_file $CUR/data/start.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - - check_table_exists ddl_only_block_related_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - kill_cdc_pid $owner_pid - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ExecuteNotDone=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') - - run_sql_file $CUR/data/finishe.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # make sure t1,t2 are equal in upstream and downstream - # we should not check this if sink type is kafka, since the checkpoint is not advance - # so the kafka consumer will not consume the dmls of t1,t2 behind the stuck DDL's commitTs - # the next check diff in line 69 will check the eventual consistency of all tables - # and it is enough to ensure the correctness of the test - if [ "$SINK_TYPE" == "mysql" ]; then - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 90 - fi - # check checkpoint does not advance - ensure 30 check_ts_not_forward $changefeed_id - - kill_cdc_pid $owner_pid - # clear failpoint, so the `ddl_not_done` table can advance - export GO_FAILPOINTS='' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - # make sure all tables are equal in upstream and downstream - check_sync_diff $WORK_DIR $CUR/conf/diff_config_2.toml 90 - - # check checkpoint advance - ensure 20 check_ts_forward $changefeed_id - - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_puller_lag/run.sh b/tests/integration_tests/ddl_puller_lag/run.sh deleted file mode 100644 index 5275936bf4..0000000000 --- a/tests/integration_tests/ddl_puller_lag/run.sh +++ /dev/null @@ -1,112 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 - -function prepare() { - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - # record tso before we create tables to skip the system table DDLs - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - - run_sql "CREATE table test.ddl_puller_lag1(id int primary key, val int);" - run_sql "CREATE table test.ddl_puller_lag2(id int primary key, val int);" - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --failpoint 'github.com/pingcap/tiflow/cdc/processor/processorDDLResolved=1*sleep(180000)' - - TOPIC_NAME="ticdc-ddl-puller-lag-test-$RANDOM" - case $SINK_TYPE in - kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=ddl_puller_lag&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - pulsar) - run_pulsar_cluster $WORK_DIR normal - SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" - ;; - *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; - esac - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" - case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; - esac -} - -function sql_check() { - # run check in sequence and short circuit principle, if error hanppens, - # the following statement will be not executed - - # check table ddl_puller_lag1. - run_sql "SELECT id, val FROM test.ddl_puller_lag1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && - check_contains "id: 1" && - check_contains "val: 1" && - check_contains "id: 2" && - check_contains "val: 22" && - check_not_contains "id: 3" && - - # check table ddl_puller_lag2. - run_sql "SELECT id, val FROM test.ddl_puller_lag2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && - check_contains "id: 1" && - check_contains "val: 1" && - check_contains "id: 2" && - check_contains "val: 22" && - check_not_contains "id: 3" -} - -function sql_test() { - # test insert/update/delete for two table in the same way. - run_sql "INSERT INTO test.ddl_puller_lag1(id, val) VALUES (1, 1);" - run_sql "INSERT INTO test.ddl_puller_lag1(id, val) VALUES (2, 2);" - run_sql "INSERT INTO test.ddl_puller_lag1(id, val) VALUES (3, 3);" - - # update id = 2 and delete id = 3 - run_sql "UPDATE test.ddl_puller_lag1 set val = 22 where id = 2;" - run_sql "DELETE from test.ddl_puller_lag1 where id = 3;" - - # same dml for table ddl_puller_lag2 - run_sql "INSERT INTO test.ddl_puller_lag2(id, val) VALUES (1, 1);" - run_sql "INSERT INTO test.ddl_puller_lag2(id, val) VALUES (2, 2);" - run_sql "INSERT INTO test.ddl_puller_lag2(id, val) VALUES (3, 3);" - - run_sql "UPDATE test.ddl_puller_lag2 set val = 22 where id = 2;" - run_sql "DELETE from test.ddl_puller_lag2 where id = 3;" - - i=0 - check_time=50 - set +e - while [ $i -lt $check_time ]; do - sql_check - ret=$? - if [ "$ret" == 0 ]; then - echo "check data successfully" - break - fi - ((i++)) - echo "check data failed $i-th time, retry later" - sleep 2 - done - set -e - - if [ $i -ge $check_time ]; then - echo "check data failed at last" - exit 1 - fi - - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -prepare $* -sleep 180 -sql_test $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/default_value/run.sh b/tests/integration_tests/default_value/run.sh index 37c5a470f8..32944fdaf0 100755 --- a/tests/integration_tests/default_value/run.sh +++ b/tests/integration_tests/default_value/run.sh @@ -1,6 +1,4 @@ #!/bin/bash -# temp sikp default_value test. -exit 0 set -e CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) From d67b1e4008a2509ceddb13c00d7a75310ffbbe7b Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 19:26:36 +0800 Subject: [PATCH 3/7] test: fix default_value test Signed-off-by: dongmen <414110582@qq.com> --- .../default_value/config.toml | 2 +- tests/integration_tests/default_value/main.go | 75 ++++++++++++++----- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/tests/integration_tests/default_value/config.toml b/tests/integration_tests/default_value/config.toml index c2773c76a7..dc950cf87e 100644 --- a/tests/integration_tests/default_value/config.toml +++ b/tests/integration_tests/default_value/config.toml @@ -17,4 +17,4 @@ port = 4000 host = "127.0.0.1" user = "root" password = "" -port = 4001 +port = 4000 diff --git a/tests/integration_tests/default_value/main.go b/tests/integration_tests/default_value/main.go index b6b6dd6f4a..656e57e7f6 100644 --- a/tests/integration_tests/default_value/main.go +++ b/tests/integration_tests/default_value/main.go @@ -67,7 +67,9 @@ func main() { } }() + util.MustExec(sourceDB0, "drop database mark;") util.MustExec(sourceDB0, "create database mark;") + var wg sync.WaitGroup start := time.Now() defer func() { @@ -104,7 +106,7 @@ func testGetDefaultValue(srcs []*sql.DB, wg *sync.WaitGroup) { for idx, src := range srcs { wg1.Add(1) go func(i int, s *sql.DB) { - dml(ctx, s, testName, i, nil) + runDMLWorker(ctx, s, testName, i, nil) defer wg1.Done() }(idx, src) } @@ -131,11 +133,16 @@ func getFunctionName(i interface{}) string { return strs[len(strs)-1] } -func ignoreableError(err error) bool { +func shouldIgnoreError(err error) bool { + if err == nil { + return true + } knownErrorList := []string{ "Error 1146", // table doesn't exist "Error 1049", // database doesn't exist "Error 1054", // unknown column + "Error 1062", // duplicate entry + "Error 1366", // Incorrect int value } for _, e := range knownErrorList { if strings.HasPrefix(err.Error(), e) { @@ -146,7 +153,7 @@ func ignoreableError(err error) bool { } // TODO: need cover the scenarios: update existing old value -func dml(ctx context.Context, db *sql.DB, table string, id int, defaultValue interface{}) { +func runDMLWorker(ctx context.Context, db *sql.DB, table string, id int, defaultValue interface{}) { var err error var i int var insertSuccess int @@ -167,25 +174,31 @@ func dml(ctx context.Context, db *sql.DB, table string, id int, defaultValue int } for i = 0; ; i++ { + args := []any{i + id*10000000, i + id*10000000 + 1} if defaultValue != nil { - _, err = db.Exec(insertSQL, i+id*10000000, i+id*10000000+1, defaultValue) - } else { - _, err = db.Exec(insertSQL, i+id*10000000, i+id*10000000+1) + args = append(args, defaultValue) } + _, err = db.Exec(insertSQL, args...) if err == nil { insertSuccess++ if insertSuccess%100 == 0 { log.S().Info(id, " insert success: ", insertSuccess) } } - if err != nil && !ignoreableError(err) { - log.Fatal("unexpected error when executing sql", zap.Error(err)) + + if !shouldIgnoreError(err) { + // get table struct + tableStruct := getTableStruct(db, table) + log.Fatal("unexpected error when executing sql", zap.Error(err), + zap.String("sql", insertSQL), + zap.Any("args", args), + zap.String("tableStruct", tableStruct)) } if i%2 == 0 { if defaultValue == nil { _, err := db.Exec(updateSQL, i+id*100000000, i+id*100000000+1) - if err != nil && !ignoreableError(err) { + if !shouldIgnoreError(err) { log.Fatal("unexpected error when executing sql", zap.Error(err)) } } @@ -200,7 +213,7 @@ func dml(ctx context.Context, db *sql.DB, table string, id int, defaultValue int } } } - if err != nil && !ignoreableError(err) { + if !shouldIgnoreError(err) { log.Fatal("unexpected error when executing sql", zap.Error(err)) } } @@ -342,6 +355,35 @@ func ddlDefaultValueFunc(ctx context.Context, db *sql.DB, format string, table s } } +func getTableStruct(db *sql.DB, table string) string { + sql := fmt.Sprintf("show create table test.`%s`", table) + rows, err := db.Query(sql) + if err != nil { + log.Fatal("failed to get table struct", zap.Error(err), zap.String("sql", sql)) + } + defer rows.Close() + + var tableName, tableStruct string + if rows.Next() { + if err := rows.Scan(&tableName, &tableStruct); err != nil { + log.Fatal("failed to scan table struct", zap.Error(err)) + } + } + + // Beautify the table struct + lines := strings.Split(tableStruct, "\n") + var beautified []string + for i, line := range lines { + line = strings.TrimSpace(line) + if i > 0 && i < len(lines)-1 { + line = " " + line + } + beautified = append(beautified, line) + } + + return strings.Join(beautified, "\n") +} + func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) { defer wg.Done() @@ -832,7 +874,7 @@ func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) { uuid = strings.ReplaceAll(uuid, "-", "_") newTbName := testName + uuid mustCreateTable(srcs[0], newTbName) - log.S().Info("running ddl test: ", newTbName) + log.S().Info("running ddl test: ", newTbName, "table struct: ", getTableStruct(srcs[0], newTbName)) var wg2 sync.WaitGroup ctx, cancel2 := context.WithCancel(context.Background()) @@ -842,9 +884,10 @@ func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) { wg2.Add(1) go func(i int, s *sql.DB) { if unit.NoDMLParas { - dml(ctx, s, newTbName, i, nil) + runDMLWorker(ctx, s, newTbName, i, nil) } else { - dml(ctx, s, newTbName, i, unit.DefaultValue) + log.Info("Use default value", zap.Any("value", unit.DefaultValue)) + runDMLWorker(ctx, s, newTbName, i, unit.DefaultValue) } wg2.Done() }(idx+i*2, src) @@ -886,9 +929,3 @@ func mustCreateTable(db *sql.DB, tableName string) { sql := fmt.Sprintf(createTableSQL, tableName) util.MustExec(db, sql) } - -func mustCreateTableWithConn(ctx context.Context, conn *sql.Conn, tableName string) { - util.MustExecWithConn(ctx, conn, createDatabaseSQL) - sql := fmt.Sprintf(createTableSQL, tableName) - util.MustExecWithConn(ctx, conn, sql) -} From 0ef1518aa2deee3df8c14718d13063a770120739 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 19:33:14 +0800 Subject: [PATCH 4/7] test: fix default_value test 2 Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/default_value/main.go | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/integration_tests/default_value/main.go b/tests/integration_tests/default_value/main.go index 656e57e7f6..8cdba46da2 100644 --- a/tests/integration_tests/default_value/main.go +++ b/tests/integration_tests/default_value/main.go @@ -34,6 +34,19 @@ import ( "go.uber.org/zap" ) +const ( + createDatabaseSQL = "create database if not exists test" + createTableSQL = ` +create table if not exists test.%s +( + id1 int unique key not null, + id2 int unique key not null, + v0 int default 11, + v1 int default null +) +` +) + var finishIdx int32 func main() { @@ -911,19 +924,6 @@ func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) { util.MustExec(srcs[0], fmt.Sprintf("create table mark.finish_mark_%d(a int primary key);", atomic.AddInt32(&finishIdx, 1))) } -const ( - createDatabaseSQL = "create database if not exists test" - createTableSQL = ` -create table if not exists test.%s -( - id1 int unique key not null, - id2 int unique key not null, - v0 int default 11, - v1 int default null -) -` -) - func mustCreateTable(db *sql.DB, tableName string) { util.MustExec(db, createDatabaseSQL) sql := fmt.Sprintf(createTableSQL, tableName) From 9ae871e7148db5fec54cb5bcfbc1c56cd153ce96 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 19:38:12 +0800 Subject: [PATCH 5/7] test: fix default_value test 3 Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/default_value/config.toml | 2 +- tests/integration_tests/default_value/main.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/default_value/config.toml b/tests/integration_tests/default_value/config.toml index dc950cf87e..c2773c76a7 100644 --- a/tests/integration_tests/default_value/config.toml +++ b/tests/integration_tests/default_value/config.toml @@ -17,4 +17,4 @@ port = 4000 host = "127.0.0.1" user = "root" password = "" -port = 4000 +port = 4001 diff --git a/tests/integration_tests/default_value/main.go b/tests/integration_tests/default_value/main.go index 8cdba46da2..c5b8041866 100644 --- a/tests/integration_tests/default_value/main.go +++ b/tests/integration_tests/default_value/main.go @@ -80,8 +80,8 @@ func main() { } }() - util.MustExec(sourceDB0, "drop database mark;") - util.MustExec(sourceDB0, "create database mark;") + util.MustExec(sourceDB0, "DROP DATABASE IF EXISTS mark;") + util.MustExec(sourceDB0, "CREATE DATABASE IF NOT EXISTS mark;") var wg sync.WaitGroup start := time.Now() From f49b32633ef89addc7d8b2d43e7876e326d52b9f Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 20:37:07 +0800 Subject: [PATCH 6/7] test: fix generate_column test 1 Signed-off-by: dongmen <414110582@qq.com> --- .github/workflows/integration_test_mysql.yaml | 15 +++++++++++-- tests/integration_tests/default_value/main.go | 6 ++--- .../generate_column/data/prepare.sql | 17 -------------- .../integration_tests/generate_column/run.sh | 22 ++++++------------- 4 files changed, 23 insertions(+), 37 deletions(-) delete mode 100644 tests/integration_tests/generate_column/data/prepare.sql diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index a71e01a14d..7e29deb64e 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -27,7 +27,8 @@ concurrency: cancel-in-progress: true jobs: - basic_e2e_test: + # To boost the test speed, we split every 10 test cases into a group. + e2e_test_group_1: runs-on: ubuntu-latest name: E2E Test steps: @@ -71,11 +72,21 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=changefeed_reconstruct + - name: Test common_1 + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=common_1 + + - name: Test foreign_key + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=foreign_key + - name: Upload test logs if: always() uses: ./.github/actions/upload-test-logs with: - log-name: basic_e2e_group1 + log-name: e2e_test_group_1 failover_e2e_test1: diff --git a/tests/integration_tests/default_value/main.go b/tests/integration_tests/default_value/main.go index c5b8041866..494fb4b389 100644 --- a/tests/integration_tests/default_value/main.go +++ b/tests/integration_tests/default_value/main.go @@ -47,7 +47,7 @@ create table if not exists test.%s ` ) -var finishIdx int32 +var finishIdx atomic.Int32 func main() { cfg := util.NewConfig() @@ -135,7 +135,7 @@ func testGetDefaultValue(srcs []*sql.DB, wg *sync.WaitGroup) { wg1.Wait() - util.MustExec(srcs[0], fmt.Sprintf("create table mark.finish_mark_%d(a int primary key);", atomic.AddInt32(&finishIdx, 1))) + util.MustExec(srcs[0], fmt.Sprintf("create table mark.finish_mark_%d(a int primary key);", finishIdx.Add(1))) }(i, ddlFunc) } wg2.Wait() @@ -921,7 +921,7 @@ func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) { } wg1.Wait() - util.MustExec(srcs[0], fmt.Sprintf("create table mark.finish_mark_%d(a int primary key);", atomic.AddInt32(&finishIdx, 1))) + util.MustExec(srcs[0], fmt.Sprintf("create table mark.finish_mark_%d(a int primary key);", finishIdx.Add(1))) } func mustCreateTable(db *sql.DB, tableName string) { diff --git a/tests/integration_tests/generate_column/data/prepare.sql b/tests/integration_tests/generate_column/data/prepare.sql deleted file mode 100644 index 789bc4d744..0000000000 --- a/tests/integration_tests/generate_column/data/prepare.sql +++ /dev/null @@ -1,17 +0,0 @@ -drop database if exists `generate_column`; -create database `generate_column`; -use `generate_column`; - -create table t (a int, b int as (a + 1) stored primary key); -insert into t(a) values (1),(2), (3),(4),(5),(6),(7); -update t set a = 10 where a = 1; -update t set a = 11 where b = 3; -delete from t where b=4; -delete from t where a=4; - -create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx1(b), unique index idx2(c)); -insert into t1 (a, c) values (1, 2),(2, 3), (3, 4),(4, 5),(5, 6),(6, 7),(7, 8); -update t1 set a = 10 where a = 1; -update t1 set a = 11 where b = 3; -delete from t1 where b=4; -delete from t1 where a=4; diff --git a/tests/integration_tests/generate_column/run.sh b/tests/integration_tests/generate_column/run.sh index 5b251d9efd..e53b024531 100644 --- a/tests/integration_tests/generate_column/run.sh +++ b/tests/integration_tests/generate_column/run.sh @@ -9,16 +9,6 @@ CDC_BINARY=cdc.test SINK_TYPE=$1 function run() { - # storage is not supported yet. - if [ "$SINK_TYPE" == "storage" ]; then - return - fi - - # TODO(dongmen): enable pulsar in the future. - if [ "$SINK_TYPE" == "pulsar" ]; then - exit 0 - fi - rm -rf $WORK_DIR && mkdir -p $WORK_DIR start_tidb_cluster --workdir $WORK_DIR @@ -43,15 +33,17 @@ function run() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" case $SINK_TYPE in kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config $CUR/conf/changefeed.toml ;; esac - run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/virtual.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first - check_table_exists generate_column.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists generate_column.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + if [[ "$SINK_TYPE" != "storage" && "$SINK_TYPE" != "pulsar" ]]; then + run_sql_file $CUR/data/stored.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists generate_column.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + fi check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - cleanup_process $CDC_BINARY } From eca32504679668b493e309c680cd4ee9afe109d5 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 6 Jan 2025 20:37:18 +0800 Subject: [PATCH 7/7] test: fix generate_column test 2 Signed-off-by: dongmen <414110582@qq.com> --- .../generate_column/data/stored.sql | 8 ++++++++ .../generate_column/data/virtual.sql | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 tests/integration_tests/generate_column/data/stored.sql create mode 100644 tests/integration_tests/generate_column/data/virtual.sql diff --git a/tests/integration_tests/generate_column/data/stored.sql b/tests/integration_tests/generate_column/data/stored.sql new file mode 100644 index 0000000000..7d7db02c36 --- /dev/null +++ b/tests/integration_tests/generate_column/data/stored.sql @@ -0,0 +1,8 @@ +use `generate_column`; +-- see https://github.com/pingcap/tiflow/issues/11704 +create table t2 (a int, b int as (a + 1) stored primary key); +insert into t2(a) values (1),(2), (3),(4),(5),(6),(7); +update t2 set a = 10 where a = 1; +update t2 set a = 11 where b = 3; +delete from t2 where b=4; +delete from t2 where a=4; \ No newline at end of file diff --git a/tests/integration_tests/generate_column/data/virtual.sql b/tests/integration_tests/generate_column/data/virtual.sql new file mode 100644 index 0000000000..789bc4d744 --- /dev/null +++ b/tests/integration_tests/generate_column/data/virtual.sql @@ -0,0 +1,17 @@ +drop database if exists `generate_column`; +create database `generate_column`; +use `generate_column`; + +create table t (a int, b int as (a + 1) stored primary key); +insert into t(a) values (1),(2), (3),(4),(5),(6),(7); +update t set a = 10 where a = 1; +update t set a = 11 where b = 3; +delete from t where b=4; +delete from t where a=4; + +create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx1(b), unique index idx2(c)); +insert into t1 (a, c) values (1, 2),(2, 3), (3, 4),(4, 5),(5, 6),(6, 7),(7, 8); +update t1 set a = 10 where a = 1; +update t1 set a = 11 where b = 3; +delete from t1 where b=4; +delete from t1 where a=4;