Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=move_table

- name: Test changefeed_fast_fail
- name: Test changefeed_error
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_fast_fail
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_error

# The 16th case in this group
- name: Test capture_session_done_during_task
Expand Down
33 changes: 32 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(node *node.Info,
pdClock: pdClock,
mc: mc,
updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024),
backend: backend,
}
c.taskScheduler = threadpool.NewThreadPoolDefault()
Expand Down Expand Up @@ -224,12 +224,43 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change
return nil
}

// checkStaleCheckpointTs checks if the checkpointTs is stale, if it is, it will send a state change event to the stateChangedCh
func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.ChangeFeedID, reportedCheckpointTs uint64) {
err := c.gcManager.CheckStaleCheckpointTs(ctx, id, reportedCheckpointTs)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err != nil {
errCode, _ := errors.RFCCode(err)
state := model.StateFailed
if !errors.IsChangefeedGCFastFailErrorCode(errCode) {
state = model.StateWarning
}
select {
case <-ctx.Done():
log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+
"there may be a lot of state need to be handled. Try next time",
zap.String("changefeed", id.String()),
zap.Error(ctx.Err()))
return
case c.stateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: id,
State: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
}:
}
}
}

func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error {
statusMap := make(map[common.ChangeFeedID]uint64)
for _, upCf := range cfs {
reportedCheckpointTs := upCf.GetStatus().CheckpointTs
if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs {
statusMap[upCf.ID] = reportedCheckpointTs
c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs)
}
}
if len(statusMap) == 0 {
Expand Down
47 changes: 27 additions & 20 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/replica"
Expand Down Expand Up @@ -99,7 +100,6 @@ type Maintainer struct {
// so when a maintainer is created, that means the dispatcher is gone and must be recreated.
ddlSpan *replica.SpanReplication

pdEndpoints []string
nodeManager *watcher.NodeManager
// closedNodes is used to record the nodes that dispatcherManager is closed
closedNodes map[node.ID]struct{}
Expand All @@ -123,8 +123,10 @@ type Maintainer struct {
// false when otherwise, such as maintainer move to different nodes.
newChangefeed bool

errLock sync.Mutex
runningErrors map[node.ID]*heartbeatpb.RunningError
runningErrors struct {
sync.Mutex
m map[node.ID]*heartbeatpb.RunningError
}
cancelUpdateMetrics context.CancelFunc

changefeedCheckpointTsGauge prometheus.Gauge
Expand All @@ -150,6 +152,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
checkpointTs uint64,
newChangfeed bool,
) *Maintainer {

mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
tableTriggerEventDispatcherID := common.NewDispatcherID()
Expand Down Expand Up @@ -184,7 +187,6 @@ func NewMaintainer(cfID common.ChangeFeedID,

ddlSpan: ddlSpan,
checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark),
runningErrors: map[node.ID]*heartbeatpb.RunningError{},
newChangefeed: newChangfeed,

changefeedCheckpointTsGauge: metrics.ChangefeedCheckpointTsGauge.WithLabelValues(cfID.Namespace(), cfID.Name()),
Expand All @@ -197,6 +199,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()),
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()),
}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)

m.watermark.Watermark = &heartbeatpb.Watermark{
CheckpointTs: checkpointTs,
Expand Down Expand Up @@ -298,15 +301,15 @@ func (m *Maintainer) Close() {
}

func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
m.errLock.Lock()
defer m.errLock.Unlock()
m.runningErrors.Lock()
defer m.runningErrors.Unlock()
var runningErrors []*heartbeatpb.RunningError
if len(m.runningErrors) > 0 {
runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors))
for _, e := range m.runningErrors {
if len(m.runningErrors.m) > 0 {
runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors.m))
for _, e := range m.runningErrors.m {
runningErrors = append(runningErrors, e)
}
clear(m.runningErrors)
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
}

status := &heartbeatpb.MaintainerStatus{
Expand All @@ -324,6 +327,9 @@ func (m *Maintainer) initialize() error {
log.Info("start to initialize changefeed maintainer",
zap.String("id", m.id.String()))

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(errors.New("failpoint injected retriable error"))
})
// detect the capture changes
m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.Name()), func(allNodes map[node.ID]*node.Info) {
m.mutex.Lock()
Expand Down Expand Up @@ -553,10 +559,10 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) {
if info, ok := m.nodeManager.GetAliveNodes()[from]; ok {
err.Node = info.AdvertiseAddr
}
m.errLock.Lock()
m.runningErrors.Lock()
m.statusChanged.Store(true)
m.runningErrors[from] = err
m.errLock.Unlock()
m.runningErrors.m[from] = err
m.runningErrors.Unlock()
}

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -709,13 +715,14 @@ func (m *Maintainer) handleError(err error) {
} else {
code = string(errors.ErrOwnerUnknown.RFCCode())
}
m.runningErrors = map[node.ID]*heartbeatpb.RunningError{
m.selfNode.ID: {
Time: time.Now().String(),
Node: m.selfNode.AdvertiseAddr,
Code: code,
Message: err.Error(),
},

m.runningErrors.Lock()
defer m.runningErrors.Unlock()
m.runningErrors.m[m.selfNode.ID] = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: m.selfNode.AdvertiseAddr,
Code: code,
Message: err.Error(),
}
m.statusChanged.Store(true)
}
Expand Down
12 changes: 10 additions & 2 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
response := &heartbeatpb.CoordinatorBootstrapResponse{}
m.maintainers.Range(func(key, value interface{}) bool {
maintainer := value.(*Maintainer)
response.Statuses = append(response.Statuses, maintainer.GetMaintainerStatus())
status := maintainer.GetMaintainerStatus()
if status.GetErr() != nil {
log.Info("fizz changefeed meet error", zap.Any("status", status))
}
response.Statuses = append(response.Statuses, status)
maintainer.statusChanged.Store(false)
maintainer.lastReportTime = time.Now()
return true
Expand Down Expand Up @@ -294,7 +298,11 @@ func (m *Manager) sendHeartbeat() {
m.maintainers.Range(func(key, value interface{}) bool {
cfMaintainer := value.(*Maintainer)
if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 {
response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus())
mStatus := cfMaintainer.GetMaintainerStatus()
if mStatus.GetErr() != nil {
log.Info("fizz changefeed meet error", zap.Any("status", mStatus))
}
response.Statuses = append(response.Statuses, mStatus)
cfMaintainer.statusChanged.Store(false)
cfMaintainer.lastReportTime = time.Now()
}
Expand Down
72 changes: 35 additions & 37 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function run() {
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedNoRetryError=1*return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

Expand All @@ -60,7 +60,9 @@ function run() {
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" ""
# CASE 1: Test unretryable error
echo "Start case 1: Test unretryable error"
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrSnapshotLostByGC]" ""
run_cdc_cli changefeed resume -c $changefeedid

check_table_exists "changefeed_error.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -69,61 +71,57 @@ function run() {
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedRetryError=return(true)'
kill -9 $capture_pid
# make sure old cpature key and old owner key expire in etcd
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
# make sure old capture key and old owner key expire in etcd
ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | grep -v "capture"
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/capture' 'capture'"
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
echo "Pass case 1"

# CASE 2: Test retryable error
echo "Start case 2: Test retryable error"
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/NewChangefeedRetryError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" ""

# try to create another changefeed to make sure the coordinator is not stuck
changefeedid_2="changefeed-error-2"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "warning" "failpoint injected retriable error" ""

run_cdc_cli changefeed remove -c $changefeedid
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"

# owner DDL error case
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/InjectChangefeedDDLError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
changefeedid_1="changefeed-error-1"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_1

run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);"
ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 warning last_warning ErrExecDDLFailed

run_cdc_cli changefeed remove -c $changefeedid_1
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
# updating GC safepoint failure case
export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_cdc_cli changefeed remove -c $changefeedid_2
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}

changefeedid_2="changefeed-error-2"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "failed" "[CDC:ErrSnapshotLostByGC]" ""
# test changefeed remove twice, and it should return "Changefeed not found"
result=$(cdc cli changefeed remove -c $changefeedid_2)
if [[ $result != *"Changefeed not found"* ]]; then
echo "changefeeed remove result is expected to contains 'Changefeed not found', \
but actually got $result"
exit 1
fi

run_cdc_cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
echo "Pass case 2"

# make sure initialize changefeed error will not stuck the owner
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)'
# CASE 3: updating GC safepoint failure case
echo "Start case 3: updating GC safepoint failure case"
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_3="changefeed-initialize-error"
run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed pause -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "null" ""
run_cdc_cli changefeed resume -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
changefeedid_3="changefeed-error-3"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "failed" "[CDC:ErrSnapshotLostByGC]" ""

run_cdc_cli changefeed remove -c $changefeedid_3
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
echo "Pass case 3"
}

trap stop_tidb_cluster EXIT
Expand Down
62 changes: 0 additions & 62 deletions tests/integration_tests/changefeed_fast_fail/run.sh

This file was deleted.