diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 2e5150c500..701ed51a8b 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -468,10 +468,10 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=move_table - - name: Test changefeed_fast_fail + - name: Test changefeed_error if: ${{ success() }} run: | - export TICDC_NEWARCH=true && make integration_test CASE=changefeed_fast_fail + export TICDC_NEWARCH=true && make integration_test CASE=changefeed_error # The 16th case in this group - name: Test capture_session_done_during_task diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 5707441892..1190ccf5a1 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -90,7 +90,7 @@ func New(node *node.Info, pdClock: pdClock, mc: mc, updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024), - stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8), + stateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024), backend: backend, } c.taskScheduler = threadpool.NewThreadPoolDefault() @@ -224,12 +224,43 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change return nil } +// checkStaleCheckpointTs checks if the checkpointTs is stale, if it is, it will send a state change event to the stateChangedCh +func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.ChangeFeedID, reportedCheckpointTs uint64) { + err := c.gcManager.CheckStaleCheckpointTs(ctx, id, reportedCheckpointTs) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err != nil { + errCode, _ := errors.RFCCode(err) + state := model.StateFailed + if !errors.IsChangefeedGCFastFailErrorCode(errCode) { + state = model.StateWarning + } + select { + case <-ctx.Done(): + log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+ + "there may be a lot of state need to be handled. Try next time", + zap.String("changefeed", id.String()), + zap.Error(ctx.Err())) + return + case c.stateChangedCh <- &ChangefeedStateChangeEvent{ + ChangefeedID: id, + State: state, + err: &model.RunningError{ + Code: string(errCode), + Message: err.Error(), + }, + }: + } + } +} + func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error { statusMap := make(map[common.ChangeFeedID]uint64) for _, upCf := range cfs { reportedCheckpointTs := upCf.GetStatus().CheckpointTs if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs { statusMap[upCf.ID] = reportedCheckpointTs + c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs) } } if len(statusMap) == 0 { diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index caf2ef8b2c..5308e9446f 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/maintainer/replica" @@ -99,7 +100,6 @@ type Maintainer struct { // so when a maintainer is created, that means the dispatcher is gone and must be recreated. ddlSpan *replica.SpanReplication - pdEndpoints []string nodeManager *watcher.NodeManager // closedNodes is used to record the nodes that dispatcherManager is closed closedNodes map[node.ID]struct{} @@ -123,8 +123,10 @@ type Maintainer struct { // false when otherwise, such as maintainer move to different nodes. newChangefeed bool - errLock sync.Mutex - runningErrors map[node.ID]*heartbeatpb.RunningError + runningErrors struct { + sync.Mutex + m map[node.ID]*heartbeatpb.RunningError + } cancelUpdateMetrics context.CancelFunc changefeedCheckpointTsGauge prometheus.Gauge @@ -150,6 +152,7 @@ func NewMaintainer(cfID common.ChangeFeedID, checkpointTs uint64, newChangfeed bool, ) *Maintainer { + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) tableTriggerEventDispatcherID := common.NewDispatcherID() @@ -184,7 +187,6 @@ func NewMaintainer(cfID common.ChangeFeedID, ddlSpan: ddlSpan, checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark), - runningErrors: map[node.ID]*heartbeatpb.RunningError{}, newChangefeed: newChangfeed, changefeedCheckpointTsGauge: metrics.ChangefeedCheckpointTsGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), @@ -197,6 +199,7 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) m.watermark.Watermark = &heartbeatpb.Watermark{ CheckpointTs: checkpointTs, @@ -298,15 +301,15 @@ func (m *Maintainer) Close() { } func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { - m.errLock.Lock() - defer m.errLock.Unlock() + m.runningErrors.Lock() + defer m.runningErrors.Unlock() var runningErrors []*heartbeatpb.RunningError - if len(m.runningErrors) > 0 { - runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors)) - for _, e := range m.runningErrors { + if len(m.runningErrors.m) > 0 { + runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors.m)) + for _, e := range m.runningErrors.m { runningErrors = append(runningErrors, e) } - clear(m.runningErrors) + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) } status := &heartbeatpb.MaintainerStatus{ @@ -324,6 +327,9 @@ func (m *Maintainer) initialize() error { log.Info("start to initialize changefeed maintainer", zap.String("id", m.id.String())) + failpoint.Inject("NewChangefeedRetryError", func() { + failpoint.Return(errors.New("failpoint injected retriable error")) + }) // detect the capture changes m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.Name()), func(allNodes map[node.ID]*node.Info) { m.mutex.Lock() @@ -553,10 +559,10 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) { if info, ok := m.nodeManager.GetAliveNodes()[from]; ok { err.Node = info.AdvertiseAddr } - m.errLock.Lock() + m.runningErrors.Lock() m.statusChanged.Store(true) - m.runningErrors[from] = err - m.errLock.Unlock() + m.runningErrors.m[from] = err + m.runningErrors.Unlock() } func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) { @@ -709,13 +715,14 @@ func (m *Maintainer) handleError(err error) { } else { code = string(errors.ErrOwnerUnknown.RFCCode()) } - m.runningErrors = map[node.ID]*heartbeatpb.RunningError{ - m.selfNode.ID: { - Time: time.Now().String(), - Node: m.selfNode.AdvertiseAddr, - Code: code, - Message: err.Error(), - }, + + m.runningErrors.Lock() + defer m.runningErrors.Unlock() + m.runningErrors.m[m.selfNode.ID] = &heartbeatpb.RunningError{ + Time: time.Now().String(), + Node: m.selfNode.AdvertiseAddr, + Code: code, + Message: err.Error(), } m.statusChanged.Store(true) } diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index b0c16cfacc..612630e1bf 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -192,7 +192,11 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { response := &heartbeatpb.CoordinatorBootstrapResponse{} m.maintainers.Range(func(key, value interface{}) bool { maintainer := value.(*Maintainer) - response.Statuses = append(response.Statuses, maintainer.GetMaintainerStatus()) + status := maintainer.GetMaintainerStatus() + if status.GetErr() != nil { + log.Info("fizz changefeed meet error", zap.Any("status", status)) + } + response.Statuses = append(response.Statuses, status) maintainer.statusChanged.Store(false) maintainer.lastReportTime = time.Now() return true @@ -294,7 +298,11 @@ func (m *Manager) sendHeartbeat() { m.maintainers.Range(func(key, value interface{}) bool { cfMaintainer := value.(*Maintainer) if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 { - response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus()) + mStatus := cfMaintainer.GetMaintainerStatus() + if mStatus.GetErr() != nil { + log.Info("fizz changefeed meet error", zap.Any("status", mStatus)) + } + response.Statuses = append(response.Statuses, mStatus) cfMaintainer.statusChanged.Store(false) cfMaintainer.lastReportTime = time.Now() } diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 08c4bda58e..a6943fd948 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -38,7 +38,7 @@ function run() { start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedNoRetryError=1*return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -60,7 +60,9 @@ function run() { pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; esac - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" "" + # CASE 1: Test unretryable error + echo "Start case 1: Test unretryable error" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrSnapshotLostByGC]" "" run_cdc_cli changefeed resume -c $changefeedid check_table_exists "changefeed_error.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -69,61 +71,57 @@ function run() { go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedRetryError=return(true)' - kill -9 $capture_pid - # make sure old cpature key and old owner key expire in etcd + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY + # make sure old capture key and old owner key expire in etcd ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | grep -v "capture" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/capture' 'capture'" ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" + echo "Pass case 1" + # CASE 2: Test retryable error + echo "Start case 2: Test retryable error" + export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/NewChangefeedRetryError=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" "" + # try to create another changefeed to make sure the coordinator is not stuck + changefeedid_2="changefeed-error-2" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "warning" "failpoint injected retriable error" "" + run_cdc_cli changefeed remove -c $changefeedid ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1} - export GO_FAILPOINTS='' - cleanup_process $CDC_BINARY - ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" - - # owner DDL error case - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/InjectChangefeedDDLError=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - changefeedid_1="changefeed-error-1" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_1 - - run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);" - ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 warning last_warning ErrExecDDLFailed - - run_cdc_cli changefeed remove -c $changefeedid_1 - cleanup_process $CDC_BINARY - ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" - # updating GC safepoint failure case - export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_cli changefeed remove -c $changefeedid_2 + ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1} - changefeedid_2="changefeed-error-2" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "failed" "[CDC:ErrSnapshotLostByGC]" "" + # test changefeed remove twice, and it should return "Changefeed not found" + result=$(cdc cli changefeed remove -c $changefeedid_2) + if [[ $result != *"Changefeed not found"* ]]; then + echo "changefeeed remove result is expected to contains 'Changefeed not found', \ + but actually got $result" + exit 1 + fi - run_cdc_cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" + echo "Pass case 2" - # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' + # CASE 3: updating GC safepoint failure case + echo "Start case 3: updating GC safepoint failure case" + export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - changefeedid_3="changefeed-initialize-error" - run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" - run_cdc_cli changefeed pause -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "null" "" - run_cdc_cli changefeed resume -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" + changefeedid_3="changefeed-error-3" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "failed" "[CDC:ErrSnapshotLostByGC]" "" + run_cdc_cli changefeed remove -c $changefeedid_3 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + echo "Pass case 3" } trap stop_tidb_cluster EXIT diff --git a/tests/integration_tests/changefeed_fast_fail/run.sh b/tests/integration_tests/changefeed_fast_fail/run.sh deleted file mode 100644 index 996aca73ab..0000000000 --- a/tests/integration_tests/changefeed_fast_fail/run.sh +++ /dev/null @@ -1,62 +0,0 @@ -#!/bin/bash - -set -eu - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test -SINK_TYPE=$1 -MAX_RETRIES=20 - -function run() { - # No need to test kafka and storage sink, since the logic are all the same. - if [ "$SINK_TYPE" != "mysql" ]; then - return - fi - - rm -rf $WORK_DIR && mkdir -p $WORK_DIR - - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=return(true)' - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" - - changefeedid="changefeed-fast-fail" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "ErrSnapshotLostByGC" "" - - # test changefeed remove - result=$(cdc cli changefeed remove -c $changefeedid) - if [[ $result != *"Changefeed remove successfully"* ]]; then - echo "changefeed remove result is expected to contains 'Changefeed remove successfully', \ - but actually got $result" - exit 1 - fi - - # test changefeed remove twice - result=$(cdc cli changefeed remove -c $changefeedid) - if [[ $result != *"Changefeed not found"* ]]; then - echo "changefeeed remove result is expected to contains 'Changefeed not found', \ - but actually got $result" - exit 1 - fi - - # test create changefeed with same name - sleep 30 - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"