Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin

@Nullable private Integer binlogSplitTaskId;

private boolean isBinlogSplitUpdateRequestAlreadySent = false;

public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context,
MySqlSourceConfig sourceConfig,
Expand Down Expand Up @@ -273,7 +275,9 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) {
}

private void requestBinlogSplitUpdateIfNeed() {
if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
if (!isBinlogSplitUpdateRequestAlreadySent
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
isBinlogSplitUpdateRequestAlreadySent = true;
for (int subtaskId : getRegisteredReader()) {
LOG.info(
"The enumerator requests subtask {} to update the binlog split after newly added table.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
import static org.apache.flink.util.Preconditions.checkState;

/** IT tests to cover various newly added tables during capture process. */
Expand Down Expand Up @@ -511,6 +512,12 @@ public void testRemoveAndAddNewTable() throws Exception {
temporaryFolder.delete();
}

@Test
public void testNewlyAddedEmptyTableAndInsertAfterJobStart() throws Exception {
testNewlyAddedTableOneByOneWithCreateBeforeStart(
1, new HashMap<>(), "address_hangzhou", "address_beijing");
}

/** Add a collect sink in the job. */
protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) {
TypeSerializer<RowData> serializer =
Expand Down Expand Up @@ -1108,4 +1115,144 @@ private static int sinkSize(String sinkName) {
}
}
}

private void testNewlyAddedTableOneByOneWithCreateBeforeStart(
int parallelism, Map<String, String> sourceOptions, String... captureAddressTables)
throws Exception {
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
String finishedSavePointPath = null;
List<String> fetchedDataList = new ArrayList<>();
for (int round = 0; round < captureAddressTables.length; round++) {
boolean insertData = round == 0;
initialAddressTables(getConnection(), captureAddressTables, round, insertData);
String[] captureTablesThisRound =
Arrays.asList(captureAddressTables)
.subList(0, round + 1)
.toArray(new String[0]);
String newlyAddedTable = captureAddressTables[round];
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
env.setRestartStrategy(noRestart());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(sourceOptions, captureTablesThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " table_name STRING,"
+ " id BIGINT,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (city, id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
Thread.sleep(3_000);
String tableName = captureAddressTables[round];
if (!insertData) {
insertData(
getConnection(),
customDatabase.getDatabaseName() + "." + tableName,
tableName.split("_")[1]);
}
// step 2: assert fetched snapshot data in this round
String cityName = newlyAddedTable.split("_")[1];
List<String> expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName));
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable);
// step 4: assert fetched binlog data in this round
// retract the old data with id 416874195632735147
fetchedDataList =
fetchedDataList.stream()
.filter(
r ->
!r.contains(
format(
"%s, 416874195632735147",
newlyAddedTable)))
.collect(Collectors.toList());
List<String> expectedBinlogUpsertDataThisRound =
Arrays.asList(
// add the new data with id 416874195632735147
format(
"+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614380, China, %s, %s West Town address 4]",
newlyAddedTable, cityName, cityName));
// step 5: assert fetched binlog data in this round
fetchedDataList.addAll(expectedBinlogUpsertDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
}
jobClient.cancel().get();
}
}

private void initialAddressTables(
JdbcConnection connection, String[] addressTables, int round, boolean insertData)
throws SQLException {
try {
connection.setAutoCommit(false);
String tableName = addressTables[round];
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"CREATE TABLE "
+ tableId
+ "("
+ " id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+ " country VARCHAR(255) NOT NULL,"
+ " city VARCHAR(255) NOT NULL,"
+ " detail_address VARCHAR(1024)"
+ ");");
if (insertData) {
insertData(connection, tableId, cityName);
}
connection.commit();
} finally {
connection.close();
}
}

private void insertData(JdbcConnection connection, String tableId, String cityName)
throws SQLException {
try {
connection.execute(
format(
"INSERT INTO %s "
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),"
+ " (416927583791428523, 'China', '%s', '%s West Town address 2'),"
+ " (417022095255614379, 'China', '%s', '%s West Town address 3');",
tableId, cityName, cityName, cityName, cityName, cityName, cityName));
} finally {
connection.close();
}
}
}