-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36931][cdc] FlinkCDC YAML supports batch mode #3812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Code implementationTopology graph: Source -> PreTransform -> PostTransform -> SchemaBatchOperator-> PartitionBy(Batch) -> BatchSink
|
|
|
During the test, a new bug was discovered and has been fixed. This PR relies on this fix. #3826 |
lvyanquan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aiwenmo for this contribution, left some comments.
And a end-to-end test is also welcomed.
...src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaBatchOperator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaBatchOperator.java
Outdated
Show resolved
Hide resolved
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionBatchOperator.java
Outdated
Show resolved
Hide resolved
...nector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
Show resolved
Hide resolved
...ysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkFunctionOperator.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataBatchSinkWriterOperator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/runtime/operators/transform/PreBatchTransformOperator.java
Outdated
Show resolved
Hide resolved
|
I think an e2e test that run in batch mode with transform module is necessary to verify the whole pipeline is runnable. |
|
Hi. I'm in the process of coding and testing. |
...eline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java
Show resolved
Hide resolved
...ain/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
Show resolved
Hide resolved
...nector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
Outdated
Show resolved
Hide resolved
yuxiqian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @aiwenmo's work, just left some comments.
My major concern is I've seen a lot of copy-and-paste codes from streaming mode, which makes maintaining code base much harder in the future. I would suggest extracting common parts into an abstract parent class (for example, put common partitioning logic in AbstractPrePartitionOperator) and extend it in StreamingPrePartitionOperator and BatchPrePartitionOperator.
Ignore that if we don't have enough time to finish it before 3.4.0.
| parallelism: 4 | ||
| schema.change.behavior: evolve | ||
| schema-operator.rpc-timeout: 1 h | ||
| batch-mode.enabled: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: since batchMode is disabled by default, maybe we can turn it on here to verify if it could be enabled correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many parameters are not effective in batch mode, so "false" is written here.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
| // TODO: Hard coding stream mode and checkpoint | ||
| boolean isBatchMode = false; | ||
| // TODO: Hard coding checkpoint | ||
| boolean isCheckpointingEnabled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is it possible to enable checkpointing in batch mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some sinks need to rely on checkpointing to complete the writing.
...time/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
Outdated
Show resolved
Hide resolved
| return TableId.parse(route.f1); | ||
| } | ||
|
|
||
| public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt if it's really a generic and reusable method, without corresponding JavaDocs and test cases. Maybe just write it as a for loop in SchemaDerivator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes use of "routes". In the latest code, I've added documentation for it.
Now that I think about it, it can also be placed in SchemaDerivator. I'll give it a try tonight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. The attempt couldn't be achieved. The current way of writing is more optimal.
...src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
Outdated
Show resolved
Hide resolved
...ser/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/runtime/operators/transform/PreBatchTransformOperator.java
Outdated
Show resolved
Hide resolved
|
Hi @aiwenmo, could you please rebase this PR with latest Code style verifier has been updated to enforce JUnit 5 + AssertJ framework and these classes might need to be migrated:
Use |
|
Hi. I have rebased this PR. |
yuxiqian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @aiwenmo's quick response.
...ser/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java
Outdated
Show resolved
Hide resolved
| // Batch mode only supports StartupMode.SNAPSHOT. | ||
| Configuration pipelineConfiguration = context.getPipelineConfiguration(); | ||
| if (pipelineConfiguration != null | ||
| && pipelineConfiguration.contains(PipelineOptions.PIPELINE_BATCH_MODE_ENABLED)) { | ||
| startupOptions = StartupOptions.snapshot(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion, what about throwing an exception explicitly if one enables batch mode with a non-snapshotting source? That would prevent some silent behavior change of startupOptions config.
Alternatively, we may add a interface in DataSourceFactory like this to make things clearer:
@PublicEvolving
public interface DataSourceFactory extends Factory {
/** Creates a {@link DataSource} instance. */
DataSource createDataSource(Context context);
/** Checking if this {@link DataSource} could be created in batch mode. */
boolean supportsBatchPipeline(Context context);
}and verifies it during translating pipeline job graph. WDYT?
| if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { | ||
| if (StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) { | ||
| // In snapshot mode, we simply emit all schemas at once. | ||
| if (!alreadySendAllCreateTableTables) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In snapshot mode, we will:
- obtain and check startup mode
- then check the flag
In any other modes, we will:
- obtain and check startup mode
I would suggest naming alreadySendAllCreateTableTables => shouldEmitAllCtesInSnapshotMode, and set it to true in snapshot mode, false (in other modes), and the checking could be simplified to:
if (shouldEmitAllCtesInSnapshotMode) {
createTableEventCache.forEach(
(tableId, createTableEvent) -> output.collect(createTableEvent)
);
shouldEmitAllCtesInSnapshotMode = false;
}so we don't have to check the startup mode every time when we receive a SourceRecord.
| createTableEventCache.forEach( | ||
| (tableId, createTableEvent) -> { | ||
| output.collect(createTableEvent); | ||
| alreadySendAllCreateTableTables = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move alreadySendAllCreateTableTables = true out of the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TransformE2e and UdfE2e has nothing to do with batch mode. Shall we refactor original cases with @ParameterizedTest, or just remove these cases to avoid code inflation and longer CI execution time?
| } | ||
|
|
||
| /** Deduce merged CreateTableEvent in batch mode. */ | ||
| public static List<CreateTableEvent> deduceMergedCreateTableEventInBatchMode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but SchemaDerivator itself shouldn't be aware of streaming / batch execution mode. Similar initial deducing logic could be ported to streaming mode later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: use consistent names for new operator classes: either BatchXXXOperator (like BatchPreTransformOp) or XXXBatchOperator (SchemaBatchOperator).
Personally I prefer the former one since it is easy to distinguish it from normal Streaming operators.
| } | ||
|
|
||
| @Test | ||
| void testDeduceMergedCreateTableEventInBatchMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, remove inBatchMode
…holeDatabaseWithCanalJsonInBatchMode
…12) * nit: simplify UdfE2eITCase * fix: ci
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aiwenmo for the great work and all for the review, LGTM
This closes apache#3812 Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Premise
MysqlCDC supports snapshot mode
MysqlCDC in Flink CDC (MySqlSource) supports StartupMode.SNAPSHOT and is of Boundedness.BOUNDED, and can run in RuntimeExecutionMode.BATCH.
Streaming VS Batch
Stream mode is suitable for job types including: jobs with high real-time requirements; in non-real-time scenarios, stateless jobs with many Shuffle steps; jobs that require continuous and stable data processing capabilities; jobs with small states, simple topologies and low fault tolerance costs.
Batch mode is suitable for job types including: in non-real-time scenarios, jobs with a large number of stateful operators; jobs that require high resource utilization; jobs with large states, complex topologies and low fault tolerance costs.
Expectation
Full snapshot synchronization
The FlinkCDC YAML job only reads the full snapshot data of the database and then writes it to the target database in Streaming or Batch mode. It is mainly used for full catch-up.
Currently, the SNAPSHOT startup strategy of the FlinkCDC YAML job can run correctly in the Streaming mode; it cannot run correctly in the Batch mode.
Full-incremental offline
The FlinkCDC YAML job collects full snapshot data + incremental log data from the final Offset of the full-incremental snapshot algorithm to the current EndingOffset for the first run; for subsequent runs, it collects from the last EndingOffset to the current EndingOffset.
The job runs in Batch mode. Users can schedule the job periodically, tolerate data delays for a certain period of time (such as hourly or daily), and ensure eventual consistency. Since the periodically scheduled incremental job only collects logs between the last EndingOffset and the current EndingOffset, duplicate full collection of data is avoided.
Test
Full snapshot synchronization in Batch mode
Solution
Use StartupMode.SNAPSHOT + Streaming for full snapshot synchronization
There is no need to modify the source code. For MysqlCDC, after specifying StartupMode.SNAPSHOT, the full snapshot synchronization job of the entire database can be run in the streaming mode. Although it is not the optimal solution, this capability can be achieved currently.
Expand the FlinkPipelineComposer applicable to the Batch mode to support full Batch synchronization
Topology graph: Source -> PreTransform -> PostTransform -> Router -> PartitionBy -> Sink
There are no change events in the Batch mode, and Schema Evolution does not need to be considered. In addition, the automatic table creation is completed before the job starts.
The field derivation of transform can be placed before the job starts instead of during runtime. Other operations such as the derivation of Router can also be placed before the job starts.
Workload: Implement the Batch construction strategy of FlinkPipelineComposer. Router needs to be independent, and Sink needs to be extended or transformed to support the implementation that does not require a coordinator (it would be better if Batch writing can be achieved).
Expand StartupMode to support users specifying the Offset range to support incremental offline synchronization
Allow users to specify the collection Offset range of the binlog, and then the user's own platform records the EndingOffset of each execution, as well as the periodic scheduling by the platform.
Discussion
1.Is it necessary to implement support for Batch mode because the benefits brought by Batch are small or the performance is not as good as Streaming. Specifically, which Batch optimizations can be used?
2.Whether the full-incremental offline method should be implemented (users can periodically schedule incremental log synchronization)?
Code implementation
Topology graph: Source -> BatchPreTransform -> PostTransform -> SchemaBatchOperator -> PartitionBy(Batch) -> BatchSink
ps: The data flow only contains CreateTableEvent and DataChangeEvent (insert).
Implementation ideas
1.Source first sends all CreateTableEvents, then sends snapshot data.
2.BatchPreTransform doesn't need to cache the state and resume, and PostTransform is no changes in other cases.
3.When SchemaBatchOperator receives the CreateTableEvent, it is only stored in the cache and no events are sent.
4.When SchemaBatchOperator receives the first DataChangeEvent, the widest downstream table structure is deduced based on the router rule, and then the table creation statement is executed in the external data source. Subsequently, the wide table structure is sent to BatchPrePartition.
5.BatchPrePartition broadcasts the CreateTableEvent to BatchPostPartition. BatchPrePartition partitions and distributes the DataChangeEvent to PostPartition based on the table ID and primary key information.
6.BatchPostPartition issues the CreateTableEvent and DataChangeEvent to BatchSink, and BatchSink performs batch writing.
Implementation effect
Computing node 1: Source -> BatchPreTransform -> PostTransform -> SchemaBatchOperator -> BatchPrePartition
Computing node 2: BatchPostPartition -> BatchSink
Batch mode: Computing node 2 starts computing only after computing node 1 is completely finished.