diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 871ed72faa..ab217af0f8 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -340,6 +340,11 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=tidb_mysql_test + - name: Test resolve_lock + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=resolve_lock + - name: Upload test logs if: always() uses: ./.github/actions/upload-test-logs diff --git a/Makefile b/Makefile index 3d2ba32884..14b3be22fa 100644 --- a/Makefile +++ b/Makefile @@ -115,6 +115,7 @@ P=3 UT_PACKAGES_DISPATCHER := ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./pkg/sink/codec/open/... ./pkg/sink/codec/canal/... UT_PACKAGES_MAINTAINER := ./maintainer/... UT_PACKAGES_COORDINATOR := ./coordinator/... +UT_PACKAGES_LOGSERVICE := ./logservice/... UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./utils/dynstream/... include tools/Makefile @@ -216,6 +217,7 @@ unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov $(UT_PACKAGES_DISPATCHER) \ $(UT_PACKAGES_MAINTAINER) \ $(UT_PACKAGES_COORDINATOR) \ + $(UT_PACKAGES_LOGSERVICE) \ $(UT_PACKAGES_OTHERS) \ || { $(FAILPOINT_DISABLE); exit 1; } tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 8f6426b140..a5bcaf0e74 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -275,7 +275,16 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u lastAdvance := span.lastAdvanceTime.Load() if now-lastAdvance > span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { ts := span.rangeLock.ResolvedTs() - if ts > span.startTs { + if ts > 0 && span.initialized.CompareAndSwap(false, true) { + log.Info("subscription client is initialized", + zap.Uint64("subscriptionID", uint64(span.subID)), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", ts)) + } + lastResolvedTs := span.resolvedTs.Load() + if ts > lastResolvedTs { + span.resolvedTs.Store(ts) + span.resolvedTsUpdated.Store(time.Now().Unix()) span.advanceResolvedTs(ts) } } diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 89d6420523..9664bbd7b2 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -250,8 +250,6 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions))) for _, regionID := range resolvedTsEvent.Regions { if state := s.getRegionState(subscriptionID, regionID); state != nil { - // Update the resolvedTs of the region here for metrics. - state.region.subscribedSpan.resolvedTs.Store(resolvedTsEvent.Ts) s.client.ds.Push(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{ state: state, worker: s, diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 1898b81b6e..9cd61ba4fd 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -54,6 +54,8 @@ const ( loadRegionRetryInterval time.Duration = 100 * time.Millisecond resolveLockMinInterval time.Duration = 10 * time.Second + resolveLockTickInterval time.Duration = 2 * time.Second + resolveLockFence time.Duration = 4 * time.Second ) var ( @@ -128,8 +130,10 @@ type subscribedSpan struct { staleLocksTargetTs atomic.Uint64 lastAdvanceTime atomic.Int64 - // This is used to calculate the resolvedTs lag for metrics. - resolvedTs atomic.Uint64 + + initialized atomic.Bool + resolvedTsUpdated atomic.Int64 + resolvedTs atomic.Uint64 } func (span *subscribedSpan) clearKVEventsCache() { @@ -332,17 +336,6 @@ func (s *SubscriptionClient) wakeSubscription(subID SubscriptionID) { s.ds.Wake(subID) } -// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is -// advanced slowly or stopped, they can try to resolve locks in the given span. -func (s *SubscriptionClient) ResolveLock(subID SubscriptionID, targetTs uint64) { - s.totalSpans.Lock() - rt := s.totalSpans.spanMap[subID] - s.totalSpans.Unlock() - if rt != nil { - rt.resolveStaleLocks(targetTs) - } -} - // RegionCount returns subscribed region count for the span. func (s *SubscriptionClient) RegionCount(subID SubscriptionID) uint64 { s.totalSpans.RLock() @@ -367,6 +360,7 @@ func (s *SubscriptionClient) Run(ctx context.Context) error { g.Go(func() error { return s.handleRangeTasks(ctx) }) g.Go(func() error { return s.handleRegions(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) + g.Go(func() error { return s.runResolveLockChecker(ctx) }) g.Go(func() error { return s.handleResolveLockTasks(ctx) }) g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { return s.errCache.dispatch(ctx) }) @@ -728,6 +722,60 @@ func (s *SubscriptionClient) doHandleError(ctx context.Context, errInfo regionEr } } +type subscriptionAndTargetTs struct { + subSpan *subscribedSpan + targetTs uint64 +} + +func (s *SubscriptionClient) runResolveLockChecker(ctx context.Context) error { + resolveLockTicker := time.NewTicker(resolveLockTickInterval) + defer resolveLockTicker.Stop() + maxCacheSize := 1024 + subSpanAndTsCache := make([]subscriptionAndTargetTs, 0, maxCacheSize) + // getResolvedTargetTs returns the targetTs to resolve stale locks. 0 means no need to resolve. + getResolvedTargetTs := func(subSpan *subscribedSpan, currentTime time.Time) uint64 { + resolvedTsUpdated := time.Unix(subSpan.resolvedTsUpdated.Load(), 0) + if !subSpan.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence { + return 0 + } + resolvedTs := subSpan.resolvedTs.Load() + resolvedTime := oracle.GetTimeFromTS(resolvedTs) + if currentTime.Sub(resolvedTime) < resolveLockFence { + return 0 + } + return oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-resolveLockTicker.C: + } + currentTime := s.pdClock.CurrentTime() + s.totalSpans.Lock() + for _, subSpan := range s.totalSpans.spanMap { + if subSpan != nil { + targetTs := getResolvedTargetTs(subSpan, currentTime) + if targetTs > 0 { + subSpanAndTsCache = append(subSpanAndTsCache, subscriptionAndTargetTs{ + subSpan: subSpan, + targetTs: targetTs, + }) + } + } + } + s.totalSpans.Unlock() + for _, subSpanAndTs := range subSpanAndTsCache { + subSpanAndTs.subSpan.resolveStaleLocks(subSpanAndTs.targetTs) + } + subSpanAndTsCache = subSpanAndTsCache[:0] + if cap(subSpanAndTsCache) > maxCacheSize { + subSpanAndTsCache = make([]subscriptionAndTargetTs, 0, maxCacheSize) + } + } +} + func (s *SubscriptionClient) handleResolveLockTasks(ctx context.Context) error { resolveLastRun := make(map[uint64]time.Time) @@ -838,6 +886,8 @@ func (s *SubscriptionClient) newSubscribedSpan( advanceResolvedTs: advanceResolvedTs, advanceInterval: advanceInterval, } + rt.initialized.Store(false) + rt.resolvedTsUpdated.Store(time.Now().Unix()) rt.resolvedTs.Store(startTs) rt.tryResolveLock = func(regionID uint64, state *regionlock.LockedRangeState) { diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 75a5247492..7e278d6080 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -54,7 +54,7 @@ func TestGenerateResolveLockTask(t *testing.T) { res := span.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 1, 100) require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) res.LockedRangeState.Initialized.Store(true) - client.ResolveLock(SubscriptionID(1), 200) + span.resolveStaleLocks(200) select { case task := <-client.resolveLockTaskCh: require.Equal(t, uint64(1), task.regionID) @@ -67,7 +67,7 @@ func TestGenerateResolveLockTask(t *testing.T) { res = span.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) state := newRegionFeedState(regionInfo{lockedRangeState: res.LockedRangeState, subscribedSpan: span}, 1) - client.ResolveLock(SubscriptionID(1), 200) + span.resolveStaleLocks(200) select { case task := <-client.resolveLockTaskCh: require.Equal(t, uint64(1), task.regionID) @@ -81,7 +81,7 @@ func TestGenerateResolveLockTask(t *testing.T) { // Task will be triggered after initialized. state.setInitialized() - client.ResolveLock(SubscriptionID(1), 200) + span.resolveStaleLocks(200) select { case <-client.resolveLockTaskCh: case <-time.After(100 * time.Millisecond):