diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index c24d0d8c3c..11ed2edfa0 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -383,6 +383,14 @@ jobs: uses: actions/setup-go@v3 with: go-version: '1.23' + + - name: Set up S3cmd cli tool + uses: s3-actions/s3cmd@v1.9.0 + with: + provider: linode + + - name: Run s3cmd command + run: s3cmd --version - name: Integration Build run: | @@ -401,10 +409,10 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=partition_table - # - name: Test multi_tables_ddl - # if: ${{ success() }} - # run: | - # export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl + - name: Test multi_tables_ddl + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl - name: Test multi_source if: ${{ success() }} diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index fa94ce1b1d..c033fdbdf6 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" + cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -1664,33 +1665,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter } } } else if !ignoreCurrentTable { - // TODO: this should report an error as old cdc behaviour - ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{heartbeatpb.DDLSpan.TableID}, - } - // the table is filtered out before rename table, we need add table here - ddlEvent.NeedAddedTables = []commonEvent.Table{ - { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, - }, - } - ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(allPhysicalIDs)) - for _, id := range allPhysicalIDs { - ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, - TableID: id, - }) - } - ddlEvent.TableNameChange = &commonEvent.TableNameChange{ - AddName: []commonEvent.SchemaTableName{ - { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, - }, - }, - } + // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, the ddl should not be fetched log.Panic("should not build a ignored rename table ddl", @@ -1745,25 +1721,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter } } } else if !ignoreCurrentTable { - ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{heartbeatpb.DDLSpan.TableID}, - } - // the table is filtered out before rename table, we need add table here - ddlEvent.NeedAddedTables = []commonEvent.Table{ - { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, - }, - } - ddlEvent.TableNameChange = &commonEvent.TableNameChange{ - AddName: []commonEvent.SchemaTableName{ - { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, - }, - }, - } + // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, the ddl should not be fetched log.Panic("should not build a ignored rename table ddl", @@ -1999,7 +1958,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte }) } } else if !ignoreCurrentTable { - // TODO: return a ddl event with error + // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, ignore } @@ -2036,7 +1996,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte }) } } else if !ignoreCurrentTable { - // TODO: return a ddl event with error + // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) } else { // ignore } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 9490b7f7f6..f03c855db4 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -840,24 +840,8 @@ func TestApplyDDLJobs(t *testing.T) { { Type: byte(model.ActionRenameTable), FinishedTs: 1020, - BlockedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{0}, - }, - NeedAddedTables: []commonEvent.Table{ - { - SchemaID: 105, - TableID: 300, - }, - }, - TableNameChange: &commonEvent.TableNameChange{ - AddName: []commonEvent.SchemaTableName{ - { - SchemaName: "test2", - TableName: "t2", - }, - }, - }, + // This is an error event, so other fields are not set + // TODO: check error }, }, }, @@ -1011,32 +995,7 @@ func TestApplyDDLJobs(t *testing.T) { { Type: byte(model.ActionRenameTable), FinishedTs: 1020, - BlockedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{0}, - }, - NeedAddedTables: []commonEvent.Table{ - { - SchemaID: 105, - TableID: 301, - }, - { - SchemaID: 105, - TableID: 302, - }, - { - SchemaID: 105, - TableID: 303, - }, - }, - TableNameChange: &commonEvent.TableNameChange{ - AddName: []commonEvent.SchemaTableName{ - { - SchemaName: "test2", - TableName: "t2", - }, - }, - }, + // TODO: check error }, }, }, @@ -1198,35 +1157,35 @@ func TestApplyDDLJobs(t *testing.T) { }, }, // test filter: only test.t1 is qualified and is filtered out after rename - { - tableID: 200, - tableFilter: buildTableFilterByNameForTest("test", "t1"), - startTs: 1000, - endTs: 1010, - result: []commonEvent.DDLEvent{ - { - Type: byte(model.ActionRenameTables), - Query: "RENAME TABLE `test`.`t1` TO `test`.`t101`;", - FinishedTs: 1010, - BlockedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{0, 200}, - }, - NeedDroppedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{200}, - }, - TableNameChange: &commonEvent.TableNameChange{ - DropName: []commonEvent.SchemaTableName{ - { - SchemaName: "test", - TableName: "t1", - }, - }, - }, - }, - }, - }, + // { + // tableID: 200, + // tableFilter: buildTableFilterByNameForTest("test", "t1"), + // startTs: 1000, + // endTs: 1010, + // result: []commonEvent.DDLEvent{ + // { + // Type: byte(model.ActionRenameTables), + // Query: "RENAME TABLE `test`.`t1` TO `test`.`t101`;", + // FinishedTs: 1010, + // BlockedTables: &commonEvent.InfluencedTables{ + // InfluenceType: commonEvent.InfluenceTypeNormal, + // TableIDs: []int64{0, 200}, + // }, + // NeedDroppedTables: &commonEvent.InfluencedTables{ + // InfluenceType: commonEvent.InfluenceTypeNormal, + // TableIDs: []int64{200}, + // }, + // TableNameChange: &commonEvent.TableNameChange{ + // DropName: []commonEvent.SchemaTableName{ + // { + // SchemaName: "test", + // TableName: "t1", + // }, + // }, + // }, + // }, + // }, + // }, }, nil, }, diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 3d701f270c..f3dc27ba66 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -77,7 +77,7 @@ type DDLEvent struct { // eventSize is the size of the event in bytes. It is set when it's unmarshaled. eventSize int64 `json:"-"` - err error `json:"-"` + Err error `json:"-"` } func (d *DDLEvent) GetType() int { @@ -93,7 +93,7 @@ func (d *DDLEvent) GetStartTs() common.Ts { } func (d *DDLEvent) GetError() error { - return d.err + return d.Err } func (d *DDLEvent) GetCommitTs() common.Ts { @@ -252,8 +252,8 @@ func (t DDLEvent) Marshal() ([]byte, error) { data = append(data, tableInfoDataSize...) } - if t.err != nil { - errData := []byte(t.err.Error()) + if t.Err != nil { + errData := []byte(t.Err.Error()) errDataSize := make([]byte, 8) binary.BigEndian.PutUint64(errDataSize, uint64(len(errData))) data = append(data, errData...) @@ -273,7 +273,7 @@ func (t *DDLEvent) Unmarshal(data []byte) error { if errorDataSize > 0 { errorData := data[len(data)-8-int(errorDataSize) : len(data)-8] log.Info("errorData", zap.String("errorData", string(errorData))) - t.err = apperror.ErrDDLEventError.FastGen(string(errorData)) + t.Err = apperror.ErrDDLEventError.FastGen(string(errorData)) } end := len(data) - 8 - int(errorDataSize) tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) diff --git a/tests/integration_tests/multi_tables_ddl/run.sh b/tests/integration_tests/multi_tables_ddl/run.sh index e9ff80d065..90519b092e 100755 --- a/tests/integration_tests/multi_tables_ddl/run.sh +++ b/tests/integration_tests/multi_tables_ddl/run.sh @@ -106,8 +106,7 @@ function run() { check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_normal "normal" "null" "" check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err1 "normal" "null" "" - # TODO: enable it - # check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" "" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" "" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60