Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=tidb_mysql_test

- name: Test resolve_lock
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=resolve_lock

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ P=3
UT_PACKAGES_DISPATCHER := ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./pkg/sink/codec/open/... ./pkg/sink/codec/canal/...
UT_PACKAGES_MAINTAINER := ./maintainer/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./utils/dynstream/...

include tools/Makefile
Expand Down Expand Up @@ -216,6 +217,7 @@ unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov
$(UT_PACKAGES_DISPATCHER) \
$(UT_PACKAGES_MAINTAINER) \
$(UT_PACKAGES_COORDINATOR) \
$(UT_PACKAGES_LOGSERVICE) \
$(UT_PACKAGES_OTHERS) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
Expand Down
11 changes: 10 additions & 1 deletion logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,16 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u
lastAdvance := span.lastAdvanceTime.Load()
if now-lastAdvance > span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
ts := span.rangeLock.ResolvedTs()
if ts > span.startTs {
if ts > 0 && span.initialized.CompareAndSwap(false, true) {
log.Info("subscription client is initialized",
zap.Uint64("subscriptionID", uint64(span.subID)),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts))
}
lastResolvedTs := span.resolvedTs.Load()
if ts > lastResolvedTs {
span.resolvedTs.Store(ts)
span.resolvedTsUpdated.Store(time.Now().Unix())
span.advanceResolvedTs(ts)
}
}
Expand Down
2 changes: 0 additions & 2 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions)))
for _, regionID := range resolvedTsEvent.Regions {
if state := s.getRegionState(subscriptionID, regionID); state != nil {
// Update the resolvedTs of the region here for metrics.
state.region.subscribedSpan.resolvedTs.Store(resolvedTsEvent.Ts)
s.client.ds.Push(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{
state: state,
worker: s,
Expand Down
76 changes: 63 additions & 13 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (

loadRegionRetryInterval time.Duration = 100 * time.Millisecond
resolveLockMinInterval time.Duration = 10 * time.Second
resolveLockTickInterval time.Duration = 2 * time.Second
resolveLockFence time.Duration = 4 * time.Second
)

var (
Expand Down Expand Up @@ -128,8 +130,10 @@ type subscribedSpan struct {
staleLocksTargetTs atomic.Uint64

lastAdvanceTime atomic.Int64
// This is used to calculate the resolvedTs lag for metrics.
resolvedTs atomic.Uint64

initialized atomic.Bool
resolvedTsUpdated atomic.Int64
resolvedTs atomic.Uint64
}

func (span *subscribedSpan) clearKVEventsCache() {
Expand Down Expand Up @@ -332,17 +336,6 @@ func (s *SubscriptionClient) wakeSubscription(subID SubscriptionID) {
s.ds.Wake(subID)
}

// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is
// advanced slowly or stopped, they can try to resolve locks in the given span.
func (s *SubscriptionClient) ResolveLock(subID SubscriptionID, targetTs uint64) {
s.totalSpans.Lock()
rt := s.totalSpans.spanMap[subID]
s.totalSpans.Unlock()
if rt != nil {
rt.resolveStaleLocks(targetTs)
}
}

// RegionCount returns subscribed region count for the span.
func (s *SubscriptionClient) RegionCount(subID SubscriptionID) uint64 {
s.totalSpans.RLock()
Expand All @@ -367,6 +360,7 @@ func (s *SubscriptionClient) Run(ctx context.Context) error {
g.Go(func() error { return s.handleRangeTasks(ctx) })
g.Go(func() error { return s.handleRegions(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
g.Go(func() error { return s.runResolveLockChecker(ctx) })
g.Go(func() error { return s.handleResolveLockTasks(ctx) })
g.Go(func() error { return s.logSlowRegions(ctx) })
g.Go(func() error { return s.errCache.dispatch(ctx) })
Expand Down Expand Up @@ -728,6 +722,60 @@ func (s *SubscriptionClient) doHandleError(ctx context.Context, errInfo regionEr
}
}

type subscriptionAndTargetTs struct {
subSpan *subscribedSpan
targetTs uint64
}

func (s *SubscriptionClient) runResolveLockChecker(ctx context.Context) error {
resolveLockTicker := time.NewTicker(resolveLockTickInterval)
defer resolveLockTicker.Stop()
maxCacheSize := 1024
subSpanAndTsCache := make([]subscriptionAndTargetTs, 0, maxCacheSize)
// getResolvedTargetTs returns the targetTs to resolve stale locks. 0 means no need to resolve.
getResolvedTargetTs := func(subSpan *subscribedSpan, currentTime time.Time) uint64 {
resolvedTsUpdated := time.Unix(subSpan.resolvedTsUpdated.Load(), 0)
if !subSpan.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence {
return 0
}
resolvedTs := subSpan.resolvedTs.Load()
resolvedTime := oracle.GetTimeFromTS(resolvedTs)
if currentTime.Sub(resolvedTime) < resolveLockFence {
return 0
}
return oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-resolveLockTicker.C:
}
currentTime := s.pdClock.CurrentTime()
s.totalSpans.Lock()
for _, subSpan := range s.totalSpans.spanMap {
if subSpan != nil {
targetTs := getResolvedTargetTs(subSpan, currentTime)
if targetTs > 0 {
subSpanAndTsCache = append(subSpanAndTsCache, subscriptionAndTargetTs{
subSpan: subSpan,
targetTs: targetTs,
})
}
}
}
s.totalSpans.Unlock()
for _, subSpanAndTs := range subSpanAndTsCache {
subSpanAndTs.subSpan.resolveStaleLocks(subSpanAndTs.targetTs)
}
subSpanAndTsCache = subSpanAndTsCache[:0]
if cap(subSpanAndTsCache) > maxCacheSize {
subSpanAndTsCache = make([]subscriptionAndTargetTs, 0, maxCacheSize)
}
}
}

func (s *SubscriptionClient) handleResolveLockTasks(ctx context.Context) error {
resolveLastRun := make(map[uint64]time.Time)

Expand Down Expand Up @@ -838,6 +886,8 @@ func (s *SubscriptionClient) newSubscribedSpan(
advanceResolvedTs: advanceResolvedTs,
advanceInterval: advanceInterval,
}
rt.initialized.Store(false)
rt.resolvedTsUpdated.Store(time.Now().Unix())
rt.resolvedTs.Store(startTs)

rt.tryResolveLock = func(regionID uint64, state *regionlock.LockedRangeState) {
Expand Down
6 changes: 3 additions & 3 deletions logservice/logpuller/subscription_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestGenerateResolveLockTask(t *testing.T) {
res := span.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 1, 100)
require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status)
res.LockedRangeState.Initialized.Store(true)
client.ResolveLock(SubscriptionID(1), 200)
span.resolveStaleLocks(200)
select {
case task := <-client.resolveLockTaskCh:
require.Equal(t, uint64(1), task.regionID)
Expand All @@ -67,7 +67,7 @@ func TestGenerateResolveLockTask(t *testing.T) {
res = span.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100)
require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status)
state := newRegionFeedState(regionInfo{lockedRangeState: res.LockedRangeState, subscribedSpan: span}, 1)
client.ResolveLock(SubscriptionID(1), 200)
span.resolveStaleLocks(200)
select {
case task := <-client.resolveLockTaskCh:
require.Equal(t, uint64(1), task.regionID)
Expand All @@ -81,7 +81,7 @@ func TestGenerateResolveLockTask(t *testing.T) {

// Task will be triggered after initialized.
state.setInitialized()
client.ResolveLock(SubscriptionID(1), 200)
span.resolveStaleLocks(200)
select {
case <-client.resolveLockTaskCh:
case <-time.After(100 * time.Millisecond):
Expand Down