Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: '1.23'

- name: Set up S3cmd cli tool
uses: s3-actions/s3cmd@v1.9.0
with:
provider: linode

- name: Run s3cmd command
run: s3cmd --version

- name: Integration Build
run: |
Expand All @@ -401,10 +409,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=partition_table

# - name: Test multi_tables_ddl
# if: ${{ success() }}
# run: |
# export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl

- name: Test multi_source
if: ${{ success() }}
Expand Down
57 changes: 9 additions & 48 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1664,33 +1665,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
}
} else if !ignoreCurrentTable {
// TODO: this should report an error as old cdc behaviour
ddlEvent.BlockedTables = &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{heartbeatpb.DDLSpan.TableID},
}
// the table is filtered out before rename table, we need add table here
ddlEvent.NeedAddedTables = []commonEvent.Table{
{
SchemaID: rawEvent.CurrentSchemaID,
TableID: rawEvent.CurrentTableID,
},
}
ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(allPhysicalIDs))
for _, id := range allPhysicalIDs {
ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{
SchemaID: rawEvent.CurrentSchemaID,
TableID: id,
})
}
ddlEvent.TableNameChange = &commonEvent.TableNameChange{
AddName: []commonEvent.SchemaTableName{
{
SchemaName: rawEvent.CurrentSchemaName,
TableName: rawEvent.CurrentTableName,
},
},
}
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query)
} else {
// if the table is both filtered out before and after rename table, the ddl should not be fetched
log.Panic("should not build a ignored rename table ddl",
Expand Down Expand Up @@ -1745,25 +1721,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
}
} else if !ignoreCurrentTable {
ddlEvent.BlockedTables = &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{heartbeatpb.DDLSpan.TableID},
}
// the table is filtered out before rename table, we need add table here
ddlEvent.NeedAddedTables = []commonEvent.Table{
{
SchemaID: rawEvent.CurrentSchemaID,
TableID: rawEvent.CurrentTableID,
},
}
ddlEvent.TableNameChange = &commonEvent.TableNameChange{
AddName: []commonEvent.SchemaTableName{
{
SchemaName: rawEvent.CurrentSchemaName,
TableName: rawEvent.CurrentTableName,
},
},
}
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query)
} else {
// if the table is both filtered out before and after rename table, the ddl should not be fetched
log.Panic("should not build a ignored rename table ddl",
Expand Down Expand Up @@ -1999,7 +1958,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
})
}
} else if !ignoreCurrentTable {
// TODO: return a ddl event with error
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query)
} else {
// if the table is both filtered out before and after rename table, ignore
}
Expand Down Expand Up @@ -2036,7 +1996,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
})
}
} else if !ignoreCurrentTable {
// TODO: return a ddl event with error
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query)
} else {
// ignore
}
Expand Down
105 changes: 32 additions & 73 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,24 +840,8 @@ func TestApplyDDLJobs(t *testing.T) {
{
Type: byte(model.ActionRenameTable),
FinishedTs: 1020,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
NeedAddedTables: []commonEvent.Table{
{
SchemaID: 105,
TableID: 300,
},
},
TableNameChange: &commonEvent.TableNameChange{
AddName: []commonEvent.SchemaTableName{
{
SchemaName: "test2",
TableName: "t2",
},
},
},
// This is an error event, so other fields are not set
// TODO: check error
},
},
},
Expand Down Expand Up @@ -1011,32 +995,7 @@ func TestApplyDDLJobs(t *testing.T) {
{
Type: byte(model.ActionRenameTable),
FinishedTs: 1020,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
NeedAddedTables: []commonEvent.Table{
{
SchemaID: 105,
TableID: 301,
},
{
SchemaID: 105,
TableID: 302,
},
{
SchemaID: 105,
TableID: 303,
},
},
TableNameChange: &commonEvent.TableNameChange{
AddName: []commonEvent.SchemaTableName{
{
SchemaName: "test2",
TableName: "t2",
},
},
},
// TODO: check error
},
},
},
Expand Down Expand Up @@ -1198,35 +1157,35 @@ func TestApplyDDLJobs(t *testing.T) {
},
},
// test filter: only test.t1 is qualified and is filtered out after rename
{
tableID: 200,
tableFilter: buildTableFilterByNameForTest("test", "t1"),
startTs: 1000,
endTs: 1010,
result: []commonEvent.DDLEvent{
{
Type: byte(model.ActionRenameTables),
Query: "RENAME TABLE `test`.`t1` TO `test`.`t101`;",
FinishedTs: 1010,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0, 200},
},
NeedDroppedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{200},
},
TableNameChange: &commonEvent.TableNameChange{
DropName: []commonEvent.SchemaTableName{
{
SchemaName: "test",
TableName: "t1",
},
},
},
},
},
},
// {
// tableID: 200,
// tableFilter: buildTableFilterByNameForTest("test", "t1"),
// startTs: 1000,
// endTs: 1010,
// result: []commonEvent.DDLEvent{
// {
// Type: byte(model.ActionRenameTables),
// Query: "RENAME TABLE `test`.`t1` TO `test`.`t101`;",
// FinishedTs: 1010,
// BlockedTables: &commonEvent.InfluencedTables{
// InfluenceType: commonEvent.InfluenceTypeNormal,
// TableIDs: []int64{0, 200},
// },
// NeedDroppedTables: &commonEvent.InfluencedTables{
// InfluenceType: commonEvent.InfluenceTypeNormal,
// TableIDs: []int64{200},
// },
// TableNameChange: &commonEvent.TableNameChange{
// DropName: []commonEvent.SchemaTableName{
// {
// SchemaName: "test",
// TableName: "t1",
// },
// },
// },
// },
// },
// },
},
nil,
},
Expand Down
10 changes: 5 additions & 5 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type DDLEvent struct {
// eventSize is the size of the event in bytes. It is set when it's unmarshaled.
eventSize int64 `json:"-"`

err error `json:"-"`
Err error `json:"-"`
}

func (d *DDLEvent) GetType() int {
Expand All @@ -93,7 +93,7 @@ func (d *DDLEvent) GetStartTs() common.Ts {
}

func (d *DDLEvent) GetError() error {
return d.err
return d.Err
}

func (d *DDLEvent) GetCommitTs() common.Ts {
Expand Down Expand Up @@ -252,8 +252,8 @@ func (t DDLEvent) Marshal() ([]byte, error) {
data = append(data, tableInfoDataSize...)
}

if t.err != nil {
errData := []byte(t.err.Error())
if t.Err != nil {
errData := []byte(t.Err.Error())
errDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(errDataSize, uint64(len(errData)))
data = append(data, errData...)
Expand All @@ -273,7 +273,7 @@ func (t *DDLEvent) Unmarshal(data []byte) error {
if errorDataSize > 0 {
errorData := data[len(data)-8-int(errorDataSize) : len(data)-8]
log.Info("errorData", zap.String("errorData", string(errorData)))
t.err = apperror.ErrDDLEventError.FastGen(string(errorData))
t.Err = apperror.ErrDDLEventError.FastGen(string(errorData))
}
end := len(data) - 8 - int(errorDataSize)
tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end])
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/multi_tables_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ function run() {

check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_normal "normal" "null" ""
check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err1 "normal" "null" ""
# TODO: enable it
# check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" ""
check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" ""

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60

Expand Down