feat(api): add /synced api to check changefeed synchronization status#2602
Conversation
Summary of ChangesHello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a crucial new API endpoint that enables users to ascertain the synchronization status of a changefeed. This feature is vital for operations requiring confirmed data consistency, such as database migrations or disaster recovery, by providing a clear, programmatic signal when a changefeed has caught up and no new data is actively being processed. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new API endpoint /api/v2/changefeeds/{changefeed_id}/synced to check if a changefeed is synchronized. The implementation involves propagating two new timestamps, LastSyncedTs and PullerResolvedTs, through the system. While the overall approach is sound, I've found a couple of significant issues in the implementation. The logic in the new API handler for determining the 'synced' status is misleading in some cases, and the aggregation of LastSyncedTs in the maintainer is incorrect, which could lead to stale data. I've provided detailed comments and suggestions to address these issues.
|
/retest |
|
/test all |
|
/CC @hongyunyan |
[LGTM Timeline notifier]Timeline:
|
|
/hold |
|
/test all |
|
/test all |
|
/test all |
|
/test all |
…aintainer.setWatermark" This reverts commit 81bf978.
|
/test all |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: flowbehappy, hongyunyan The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
What problem does this PR solve?
Issue Number: close #1630
What is changed and how it works?
This PR introduces a new API endpoint
GET /api/v2/changefeeds/{changefeed-id}/syncedto allow users to reliably check if a changefeed has finished replicating all available upstream data.The core of this feature is a new, more robust definition of "synced" that handles idle sources and stalled upstream regions, providing a more accurate status than just observing checkpoint lag.
Detailed Changes
1. New API Endpoint and Sync Logic
api/v2/changefeed.go: The primary logic for the feature resides in the newsyncedfunction.CheckpointTs,LastSyncedTs, andLogCoordinatorResolvedTs.synced: trueif(now - LastSyncedTs)is greater thanSyncedCheckIntervalAND(now - CheckpointTs)is less thanCheckpointInterval. This means no new data has been received for a while, but the sink is still actively checkpointing.(now - LastSyncedTs)is large but(now - CheckpointTs)is also large, it checks for an upstream issue. It comparesLogCoordinatorResolvedTsandCheckpointTs.LastSyncedTsis very recent, it returnssynced: false.2.
LastSyncedTsPropagationTo know when the last transaction was applied, a new timestamp,
LastSyncedTs, is now tracked and propagated from the dispatcher up to the coordinator.TableProgressnow trackslastSyncedTsby recording the commit timestamp of each flushed event. To prevent backward movement, it only stores the maximum value seen.BasicDispatcherincludesLastSyncedTsin its heartbeat information.Maintainerreceives heartbeats from all dispatchers and aggregates theLastSyncedTs. It updates its global watermark with the maximumLastSyncedTsreceived.MaintainerStatusnow includes the aggregatedLastSyncedTs.WatermarkandMaintainerStatusmessages inheartbeatpbhave been updated to include thelastSyncedTsfield.MaintainerStatusand stores theLastSyncedTsin theChangeFeedStatusstruct.3. On-Demand
LogCoordinatorResolvedTsRetrievalTo get the most up-to-date puller progress, a new request-response mechanism has been implemented.
LogCoordinatorResolvedTsRequestandLogCoordinatorResolvedTsResponseinheartbeat.proto.Controllernow has aRequestResolvedTsFromLogCoordinatormethod. When called by the API handler, it broadcasts a request to all alive log coordinator nodes.resolvedTsvalue rather than a potentially stale, periodically reported one.logCoordinatormodule now handles theLogCoordinatorResolvedTsRequest.4. State Management and Struct Changes
coordinator/changefeed/changefeed.go: TheChangefeedstruct now includeslogCoordinatorResolvedTsto cache the value received from the log coordinator.coordinator/changefeed/changefeed_db.go: Added new methods to update and retrievelogCoordinatorResolvedTsandchangefeedIDby name.pkg/config/changefeed.go: TheChangeFeedStatusstruct was updated to includeLastSyncedTsandLogCoordinatorResolvedTs(these are transient and not persisted to etcd).heartbeatpb/watermark_util.go: A newUpdatemethod was added to handle the aggregation logic for the watermark, usingminfor checkpoint/resolved TS andmaxforLastSyncedTs.5. Testing
synced_statusandsynced_status_with_redo.syncedstatus and theinfomessage returned by the API in each scenario.Check List
Tests
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test/syncedQuestions
Will it cause performance regression or break compatibility?
None
Do you need to update user documentation, design documentation or monitoring documentation?
None
Release note