Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down