-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Fix](StreamingJob) Optimize CDC consumption strategy #60181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes the CDC (Change Data Capture) consumption strategy for streaming jobs by refactoring the data polling mechanism and improving heartbeat handling.
Changes:
- Refactored the SourceReader interface to split
readSplitRecordsinto two methods:prepareAndSubmitSplit(for split preparation) andpollRecords(for data retrieval) - Moved polling logic from individual readers to the PipelineCoordinator, introducing heartbeat-based synchronization to determine when to stop polling
- Reduced the Debezium heartbeat interval from 10 seconds to 3 seconds for faster offset updates and connection issue detection
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| Constants.java | Reduced DEBEZIUM_HEARTBEAT_INTERVAL_MS from 10000ms to 3000ms |
| SourceReader.java | Split readSplitRecords into prepareAndSubmitSplit and pollRecords methods |
| SplitReadResult.java | Removed recordIterator field as polling is now handled at coordinator level |
| JdbcIncrementalSourceReader.java | Implemented new split preparation/polling pattern, removed old pollUntilDataAvailable logic |
| MySqlSourceReader.java | Similar refactoring as JdbcIncrementalSourceReader for MySQL-specific implementation |
| PostgresSourceReader.java | Added heartbeat interval configuration for Postgres CDC |
| PipelineCoordinator.java | Major refactoring: added heartbeat-aware polling loops in fetchRecords and writeRecords, extracted helper methods for cleanup and offset extraction |
| StreamingMultiTblTask.java | Minor log message simplification |
Comments suppressed due to low confidence (1)
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:355
- The closeJobStreamLoad method is not synchronized, which could lead to a race condition with getOrCreateBatchStreamLoad. If closeJobStreamLoad is called while another thread is accessing the batch stream load, the stream load could be removed and closed while being used, potentially causing NullPointerException or data loss. Consider synchronizing this method or using a more fine-grained locking mechanism.
public void closeJobStreamLoad(Long jobId) {
DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId);
if (batchStreamLoad != null) {
LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId);
batchStreamLoad.close();
batchStreamLoad = null;
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Outdated
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
TPC-H: Total hot run time: 30679 ms |
TPC-DS: Total hot run time: 172675 ms |
ClickBench: Total hot run time: 26.61 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
...ient/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
Show resolved
Hide resolved
TPC-H: Total hot run time: 31211 ms |
TPC-DS: Total hot run time: 172749 ms |
ClickBench: Total hot run time: 27.19 s |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 31339 ms |
TPC-DS: Total hot run time: 172273 ms |
ClickBench: Total hot run time: 26.65 s |
|
run buildall |
TPC-H: Total hot run time: 32511 ms |
ClickBench: Total hot run time: 28.24 s |
FE UT Coverage ReportIncrement line coverage |
sollhui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
PR approved by anyone and no changes requested. |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #58898 #59461
Optimize CDC consumption strategy
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)