From 662d1476f8be2291b2e46384f7ac7d0b83477f0e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 11:40:47 +0800 Subject: [PATCH 1/7] test: adjust Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 5 ++ .../integration_tests/changefeed_error/run.sh | 66 +++++++++---------- .../changefeed_fast_fail/run.sh | 62 ----------------- 3 files changed, 37 insertions(+), 96 deletions(-) delete mode 100644 tests/integration_tests/changefeed_fast_fail/run.sh diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index caf2ef8b2c..fa7d06656d 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/maintainer/replica" @@ -150,6 +151,7 @@ func NewMaintainer(cfID common.ChangeFeedID, checkpointTs uint64, newChangfeed bool, ) *Maintainer { + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) tableTriggerEventDispatcherID := common.NewDispatcherID() @@ -324,6 +326,9 @@ func (m *Maintainer) initialize() error { log.Info("start to initialize changefeed maintainer", zap.String("id", m.id.String())) + failpoint.Inject("NewChangefeedRetryError", func() { + failpoint.Return(errors.New("failpoint injected retriable error")) + }) // detect the capture changes m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.Name()), func(allNodes map[node.ID]*node.Info) { m.mutex.Lock() diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 08c4bda58e..09e629f8b5 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -38,7 +38,7 @@ function run() { start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedNoRetryError=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -60,7 +60,9 @@ function run() { pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; esac - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" "" + # CASE 1: Test unretryable error + echo "Start case 1: Test unretryable error" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrSnapshotLostByGC]" "" run_cdc_cli changefeed resume -c $changefeedid check_table_exists "changefeed_error.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -69,37 +71,47 @@ function run() { go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedRetryError=return(true)' - kill -9 $capture_pid - # make sure old cpature key and old owner key expire in etcd + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY + # make sure old capture key and old owner key expire in etcd ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | grep -v "capture" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/capture' 'capture'" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" + echo "Pass case 1" + # CASE 2: Test retryable error + echo "Start case 2: Test retryable error" + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/maintainer/NewChangefeedRetryError=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" "" + # try to create another changefeed to make sure the coordinator is not stuck + changefeedid_2="changefeed-error-2" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "warning" "failpoint injected retriable error" "" + run_cdc_cli changefeed remove -c $changefeedid ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1} - export GO_FAILPOINTS='' - cleanup_process $CDC_BINARY - ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" - - # owner DDL error case - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/InjectChangefeedDDLError=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - changefeedid_1="changefeed-error-1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_1 + run_cdc_cli changefeed remove -c $changefeedid_2 + ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1} - run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);" - ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 warning last_warning ErrExecDDLFailed + # test changefeed remove twice, and it should return "Changefeed not found" + result=$(cdc cli changefeed remove -c $changefeedid_2) + if [[ $result != *"Changefeed not found"* ]]; then + echo "changefeeed remove result is expected to contains 'Changefeed not found', \ + but actually got $result" + exit 1 + fi - run_cdc_cli changefeed remove -c $changefeedid_1 + export GO_FAILPOINTS='' cleanup_process $CDC_BINARY ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" - # updating GC safepoint failure case - export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)' + echo "Pass case 2" + + # CASE 3: updating GC safepoint failure case + echo "Start case 3: updating GC safepoint failure case" + export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_2="changefeed-error-2" @@ -109,21 +121,7 @@ function run() { run_cdc_cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY - - # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - changefeedid_3="changefeed-initialize-error" - run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" - run_cdc_cli changefeed pause -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "null" "" - run_cdc_cli changefeed resume -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" - run_cdc_cli changefeed remove -c $changefeedid_3 - export GO_FAILPOINTS='' - cleanup_process $CDC_BINARY + echo "Pass case 3" } trap stop_tidb_cluster EXIT diff --git a/tests/integration_tests/changefeed_fast_fail/run.sh b/tests/integration_tests/changefeed_fast_fail/run.sh deleted file mode 100644 index 996aca73ab..0000000000 --- a/tests/integration_tests/changefeed_fast_fail/run.sh +++ /dev/null @@ -1,62 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 -MAX_RETRIES=20 - -function run() { - # No need to test kafka and storage sink, since the logic are all the same. - if [ "$SINK_TYPE" != "mysql" ]; then - return - fi - - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" - - changefeedid="changefeed-fast-fail" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "ErrSnapshotLostByGC" "" - - # test changefeed remove - result=$(cdc cli changefeed remove -c $changefeedid) - if [[ $result != *"Changefeed remove successfully"* ]]; then - echo "changefeed remove result is expected to contains 'Changefeed remove successfully', \ - but actually got $result" - exit 1 - fi - - # test changefeed remove twice - result=$(cdc cli changefeed remove -c $changefeedid) - if [[ $result != *"Changefeed not found"* ]]; then - echo "changefeeed remove result is expected to contains 'Changefeed not found', \ - but actually got $result" - exit 1 - fi - - # test create changefeed with same name - sleep 30 - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From f93333a553049288c7b0813631ee9570a870ac60 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 11:41:54 +0800 Subject: [PATCH 2/7] test: adjust github action Signed-off-by: dongmen <414110582@qq.com> --- .github/workflows/integration_test_mysql.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 2e5150c500..701ed51a8b 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -468,10 +468,10 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=move_table - - name: Test changefeed_fast_fail + - name: Test changefeed_error if: ${{ success() }} run: | - export TICDC_NEWARCH=true && make integration_test CASE=changefeed_fast_fail + export TICDC_NEWARCH=true && make integration_test CASE=changefeed_error # The 16th case in this group - name: Test capture_session_done_during_task From df11d3b97a01520587bd63942dcac7ed52b20525 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 12:43:25 +0800 Subject: [PATCH 3/7] maintainer: fix runningErrors not being set correctly Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 41 +++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index fa7d06656d..31d5dcac4c 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -124,8 +124,10 @@ type Maintainer struct { // false when otherwise, such as maintainer move to different nodes. newChangefeed bool - errLock sync.Mutex - runningErrors map[node.ID]*heartbeatpb.RunningError + runningErrors struct { + sync.Mutex + m map[node.ID]*heartbeatpb.RunningError + } cancelUpdateMetrics context.CancelFunc changefeedCheckpointTsGauge prometheus.Gauge @@ -186,7 +188,6 @@ func NewMaintainer(cfID common.ChangeFeedID, ddlSpan: ddlSpan, checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark), - runningErrors: map[node.ID]*heartbeatpb.RunningError{}, newChangefeed: newChangfeed, changefeedCheckpointTsGauge: metrics.ChangefeedCheckpointTsGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), @@ -199,6 +200,7 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) m.watermark.Watermark = &heartbeatpb.Watermark{ CheckpointTs: checkpointTs, @@ -300,15 +302,15 @@ func (m *Maintainer) Close() { } func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { - m.errLock.Lock() - defer m.errLock.Unlock() + m.runningErrors.Lock() + defer m.runningErrors.Unlock() var runningErrors []*heartbeatpb.RunningError - if len(m.runningErrors) > 0 { - runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors)) - for _, e := range m.runningErrors { + if len(m.runningErrors.m) > 0 { + runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors.m)) + for _, e := range m.runningErrors.m { runningErrors = append(runningErrors, e) } - clear(m.runningErrors) + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) } status := &heartbeatpb.MaintainerStatus{ @@ -558,10 +560,10 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) { if info, ok := m.nodeManager.GetAliveNodes()[from]; ok { err.Node = info.AdvertiseAddr } - m.errLock.Lock() + m.runningErrors.Lock() m.statusChanged.Store(true) - m.runningErrors[from] = err - m.errLock.Unlock() + m.runningErrors.m[from] = err + m.runningErrors.Unlock() } func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) { @@ -714,13 +716,14 @@ func (m *Maintainer) handleError(err error) { } else { code = string(errors.ErrOwnerUnknown.RFCCode()) } - m.runningErrors = map[node.ID]*heartbeatpb.RunningError{ - m.selfNode.ID: { - Time: time.Now().String(), - Node: m.selfNode.AdvertiseAddr, - Code: code, - Message: err.Error(), - }, + + m.runningErrors.Lock() + defer m.runningErrors.Unlock() + m.runningErrors.m[m.selfNode.ID] = &heartbeatpb.RunningError{ + Time: time.Now().String(), + Node: m.selfNode.AdvertiseAddr, + Code: code, + Message: err.Error(), } m.statusChanged.Store(true) } From 90c4dd9955238936a8c5c0de0ce23be7bbc66b78 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 12:51:04 +0800 Subject: [PATCH 4/7] maintainer: add debug log Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer_manager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index b0c16cfacc..68ceb384a1 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -192,7 +192,11 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { response := &heartbeatpb.CoordinatorBootstrapResponse{} m.maintainers.Range(func(key, value interface{}) bool { maintainer := value.(*Maintainer) - response.Statuses = append(response.Statuses, maintainer.GetMaintainerStatus()) + status := maintainer.GetMaintainerStatus() + if status.GetErr() != nil { + log.Info("fizz changefeed meet error", zap.Any("status", status)) + } + response.Statuses = append(response.Statuses, status) maintainer.statusChanged.Store(false) maintainer.lastReportTime = time.Now() return true From 17e911e99d4baabe4f63a18c703ec26703752f3c Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 13:08:48 +0800 Subject: [PATCH 5/7] maintainer: add debug log 2 Signed-off-by: dongmen <414110582@qq.com> --- maintainer/maintainer.go | 1 - maintainer/maintainer_manager.go | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 31d5dcac4c..5308e9446f 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -100,7 +100,6 @@ type Maintainer struct { // so when a maintainer is created, that means the dispatcher is gone and must be recreated. ddlSpan *replica.SpanReplication - pdEndpoints []string nodeManager *watcher.NodeManager // closedNodes is used to record the nodes that dispatcherManager is closed closedNodes map[node.ID]struct{} diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 68ceb384a1..612630e1bf 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -298,7 +298,11 @@ func (m *Manager) sendHeartbeat() { m.maintainers.Range(func(key, value interface{}) bool { cfMaintainer := value.(*Maintainer) if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 { - response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus()) + mStatus := cfMaintainer.GetMaintainerStatus() + if mStatus.GetErr() != nil { + log.Info("fizz changefeed meet error", zap.Any("status", mStatus)) + } + response.Statuses = append(response.Statuses, mStatus) cfMaintainer.statusChanged.Store(false) cfMaintainer.lastReportTime = time.Now() } From 05fd7b229245d4732598a9da2f84da8f549efc50 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 13:15:14 +0800 Subject: [PATCH 6/7] test: adjust test Signed-off-by: dongmen <414110582@qq.com> --- tests/integration_tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 09e629f8b5..917d51ed14 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -81,7 +81,7 @@ function run() { # CASE 2: Test retryable error echo "Start case 2: Test retryable error" - export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/maintainer/NewChangefeedRetryError=return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/NewChangefeedRetryError=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" "" From d9172a07520f4ab8ef09653855a6fe82db7bada4 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 21 Jan 2025 13:46:10 +0800 Subject: [PATCH 7/7] coordinator: add check stale changefeed checkpointTs 2 Signed-off-by: dongmen <414110582@qq.com> --- coordinator/coordinator.go | 33 ++++++++++++++++++- .../integration_tests/changefeed_error/run.sh | 8 ++--- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 5707441892..1190ccf5a1 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -90,7 +90,7 @@ func New(node *node.Info, pdClock: pdClock, mc: mc, updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024), - stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8), + stateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024), backend: backend, } c.taskScheduler = threadpool.NewThreadPoolDefault() @@ -224,12 +224,43 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change return nil } +// checkStaleCheckpointTs checks if the checkpointTs is stale, if it is, it will send a state change event to the stateChangedCh +func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.ChangeFeedID, reportedCheckpointTs uint64) { + err := c.gcManager.CheckStaleCheckpointTs(ctx, id, reportedCheckpointTs) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err != nil { + errCode, _ := errors.RFCCode(err) + state := model.StateFailed + if !errors.IsChangefeedGCFastFailErrorCode(errCode) { + state = model.StateWarning + } + select { + case <-ctx.Done(): + log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+ + "there may be a lot of state need to be handled. Try next time", + zap.String("changefeed", id.String()), + zap.Error(ctx.Err())) + return + case c.stateChangedCh <- &ChangefeedStateChangeEvent{ + ChangefeedID: id, + State: state, + err: &model.RunningError{ + Code: string(errCode), + Message: err.Error(), + }, + }: + } + } +} + func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error { statusMap := make(map[common.ChangeFeedID]uint64) for _, upCf := range cfs { reportedCheckpointTs := upCf.GetStatus().CheckpointTs if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs { statusMap[upCf.ID] = reportedCheckpointTs + c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs) } } if len(statusMap) == 0 { diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 917d51ed14..a6943fd948 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -114,11 +114,11 @@ function run() { export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - changefeedid_2="changefeed-error-2" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "failed" "[CDC:ErrSnapshotLostByGC]" "" + changefeedid_3="changefeed-error-3" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "failed" "[CDC:ErrSnapshotLostByGC]" "" - run_cdc_cli changefeed remove -c $changefeedid_2 + run_cdc_cli changefeed remove -c $changefeedid_3 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY echo "Pass case 3"