Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/check_and_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,47 @@ jobs:

- name: Build
run: make cdc

basic_e2e_test:
runs-on: ubuntu-latest
name: E2E Test
steps:
- name: Check out code
uses: actions/checkout@v2

- name: Setup Go environment
uses: actions/setup-go@v3
with:
go-version: '1.21'

- name: Integration Build
run: |
tests/scripts/download-integration-test-binaries.sh master true
go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl
make integration_test_build
ls -l bin/ && ls -l tools/bin/

- name: Test charset_gbk
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=charset_gbk

- name: Copy logs to hack permission
if: ${{ always() }}
run: |
cat /tmp/tidb_cdc_test/charset_gbk/stdout.log
tail -n 10 /tmp/tidb_cdc_test/charset_gbk/cdc.log
mkdir ./logs
sudo cp -r -L /tmp/tidb_cdc_test/charset_gbk/stdout.log ./logs/
sudo cp -r -L /tmp/tidb_cdc_test/charset_gbk/cdc.log ./logs/
sudo chown -R runner ./logs

# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
continue-on-error: true
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: upstream-switch-logs
path: |
./logs
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ generate-protobuf:
cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd

integration_test_build: check_failpoint_ctl
# $(FAILPOINT_ENABLE)
integration_test_build: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc/cmd \
|| { $(FAILPOINT_DISABLE); echo "Failed to build cdc.test"; exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
# $(FAILPOINT_DISABLE)
$(FAILPOINT_DISABLE)

failpoint-enable: check_failpoint_ctl
$(FAILPOINT_ENABLE)
Expand Down
6 changes: 4 additions & 2 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo
log.Info("Received a stale event, ignore it", zap.Any("event", event), zap.Any("dispatcher", d.id))
}
if event.GetType() == commonEvent.TypeDMLEvent ||
event.GetType() == commonEvent.TypeDDLEvent {
event.GetType() == commonEvent.TypeDDLEvent ||
event.GetType() == commonEvent.TypeHandshakeEvent {
if event.GetSeq() != d.lastEventSeq.Add(1) {
log.Warn("Received a out-of-order event, reset the dispatcher", zap.Any("event", event), zap.Any("dispatcher", d.id))
log.Warn("Received a out-of-order event, reset the dispatcher", zap.Any("dispatcher", d.id),
zap.Uint64("receivedSeq", event.GetSeq()), zap.Uint64("lastEventSeq", d.lastEventSeq.Load()), zap.Any("event", event))
d.reset()
return false
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func SetDispatcherTaskScheduler(taskScheduler threadpool.ThreadPool) {
DispatcherTaskScheduler = taskScheduler
}

// EventsHandler is used to dispatcher the events received.
// EventsHandler is used to dispatch the received events.
// If the event is a DML event, it will be added to the sink for writing to downstream.
// If the event is a resolved TS event, it will be update the resolvedTs of the dispatcher.
// If the event is a DDL event,
Expand Down
4 changes: 2 additions & 2 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (e *eventStore) updateMetrics(ctx context.Context) error {
e.dispatcherStates.RLock()
for _, subscriptionStat := range e.dispatcherStates.n {
// resolved ts lag
resolvedTs := subscriptionStat.resolvedTs
resolvedTs := atomic.LoadUint64(&subscriptionStat.resolvedTs)
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
Expand Down Expand Up @@ -616,7 +616,7 @@ func (e *eventStore) batchAndWriteEvents(ctx context.Context, db *pebble.DB, inp
log.Warn("unknown subscriptionID", zap.Uint64("subID", uint64(subID)))
continue
}
subscriptionStat.resolvedTs = resolvedTs
atomic.StoreUint64(&subscriptionStat.resolvedTs, resolvedTs)
for dispatcherID := range subscriptionStat.ids {
dispatcherStat := e.dispatcherStates.m[dispatcherID]
dispatcherStat.notifier(resolvedTs)
Expand Down
6 changes: 2 additions & 4 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,11 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn {
zap.Error(err))
}
return func(id node.ID) *messaging.TargetMessage {
var ddlDispatcherID *heartbeatpb.DispatcherID
// only send dispatcher id to dispatcher manager on the same node
if id == m.selfNode.ID {
ddlDispatcherID = m.tableTriggerEventDispatcherID.ToPB()
log.Info("create table event trigger dispatcher", zap.String("changefeed", m.id.String()),
zap.String("server", id.String()),
zap.String("dispatcher id", ddlDispatcherID.String()))
zap.String("dispatcher id", m.tableTriggerEventDispatcherID.String()))
}
log.Info("send maintainer bootstrap message",
zap.String("changefeed", m.id.String()),
Expand All @@ -633,7 +631,7 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn {
ChangefeedID: m.id.ToPB(),
Config: cfgBytes,
StartTs: m.startCheckpointTs,
TableTriggerEventDispatcherId: ddlDispatcherID,
TableTriggerEventDispatcherId: m.tableTriggerEventDispatcherID.ToPB(),
})
}
}
Expand Down
70 changes: 35 additions & 35 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error {

if !(event.TiDBOnly && !w.cfg.IsTiDB) {
err := w.execDDLWithMaxRetries(event)

if err != nil {
log.Error("exec ddl failed", zap.Error(err))
return err
Expand Down Expand Up @@ -270,9 +269,6 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
case commonEvent.InfluenceTypeNormal:
tableIds = append(tableIds, relatedTables.TableIDs...)
case commonEvent.InfluenceTypeDB:
if w.tableSchemaStore == nil {
log.Panic("table schema store is nil")
}
ids := w.tableSchemaStore.GetTableIdsByDB(relatedTables.SchemaID)
tableIds = append(tableIds, ids...)
case commonEvent.InfluenceTypeAll:
Expand All @@ -299,41 +295,45 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
tableIds = append(tableIds, table.TableID)
}

// generate query
//INSERT INTO `tidb_cdc`.`ddl_ts` (ticdc_cluster_id, changefeed, ddl_ts, table_id) values(...) ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;
var builder strings.Builder
builder.WriteString("INSERT INTO ")
builder.WriteString(filter.TiCDCSystemSchema)
builder.WriteString(".")
builder.WriteString(filter.DDLTsTable)
builder.WriteString("(ticdc_cluster_id, changefeed, ddl_ts, table_id) VALUES ")
if len(tableIds) > 0 {
// generate query
//INSERT INTO `tidb_cdc`.`ddl_ts` (ticdc_cluster_id, changefeed, ddl_ts, table_id) values(...) ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;
var builder strings.Builder
builder.WriteString("INSERT INTO ")
builder.WriteString(filter.TiCDCSystemSchema)
builder.WriteString(".")
builder.WriteString(filter.DDLTsTable)
builder.WriteString("(ticdc_cluster_id, changefeed, ddl_ts, table_id) VALUES ")

for idx, tableId := range tableIds {
builder.WriteString("('")
builder.WriteString(ticdcClusterID)
builder.WriteString("', '")
builder.WriteString(changefeedID)
builder.WriteString("', '")
builder.WriteString(ddlTs)
builder.WriteString("', ")
builder.WriteString(strconv.FormatInt(tableId, 10))
builder.WriteString(")")
if idx < len(tableIds)-1 {
builder.WriteString(", ")
for idx, tableId := range tableIds {
builder.WriteString("('")
builder.WriteString(ticdcClusterID)
builder.WriteString("', '")
builder.WriteString(changefeedID)
builder.WriteString("', '")
builder.WriteString(ddlTs)
builder.WriteString("', ")
builder.WriteString(strconv.FormatInt(tableId, 10))
builder.WriteString(")")
if idx < len(tableIds)-1 {
builder.WriteString(", ")
}
}
}
builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;")
builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;")

query := builder.String()
log.Info("query is", zap.Any("query", query))
_, err = tx.Exec(query)
if err != nil {
log.Error("failed to write ddl ts table", zap.Error(err))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write ddl ts table", zap.Error(err2))
query := builder.String()
log.Info("query is", zap.Any("query", query))
_, err = tx.Exec(query)
if err != nil {
log.Error("failed to write ddl ts table", zap.Error(err))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write ddl ts table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table;"))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table;"))
} else {
log.Error("table ids is empty when write ddl ts table, FIX IT", zap.Any("event", event))
}

if len(dropTableIds) > 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pingcap/ticdc/pkg/sink/codec/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

// GetEncoderConfig returns the encoder config and validates the config.
Expand Down Expand Up @@ -78,17 +79,34 @@ func (s *TableSchemaStore) AddEvent(event *commonEvent.DDLEvent) {
s.tableIDStore.AddEvent(event)
}

func (s *TableSchemaStore) initialized() bool {
if s == nil || (s.tableIDStore == nil && s.tableNameStore == nil) {
log.Warn("TableSchemaStore is not initialized", zap.Any("tableSchemaStore", s))
return false
}
return true
}

func (s *TableSchemaStore) GetTableIdsByDB(schemaID int64) []int64 {
if !s.initialized() {
return nil
}
return s.tableIDStore.GetTableIdsByDB(schemaID)
}

func (s *TableSchemaStore) GetAllTableIds() []int64 {
if !s.initialized() {
return nil
}
return s.tableIDStore.GetAllTableIds()
}

// GetAllTableNames only will be called when maintainer send message to ask dispatcher to write checkpointTs to downstream.
// So the ts must be <= the latest received event ts of table trigger event dispatcher.
func (s *TableSchemaStore) GetAllTableNames(ts uint64) []*commonEvent.SchemaTableName {
if !s.initialized() {
return nil
}
return s.tableNameStore.GetAllTableNames(ts)
}

Expand Down
Loading