Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,26 @@ private boolean contains(List<Object> list, @Nullable Object element) {

@Override
public Object retract(Object accumulator, Object retractField) {
// it's hard to mark the input is retracted without accumulator
if (accumulator == null) {
return null;
}

InternalArray acc = (InternalArray) accumulator;
// nothing to be retracted
if (retractField == null) {
return accumulator;
}
InternalArray retract = (InternalArray) retractField;
if (retract.size() == 0) {
return accumulator;
}

List<Object> retractedElements = new ArrayList<>();
for (int i = 0; i < retract.size(); i++) {
retractedElements.add(elementGetter.getElementOrNull(retract, i));
}

InternalArray acc = (InternalArray) accumulator;
List<Object> accElements = new ArrayList<>();
for (int i = 0; i < acc.size(); i++) {
Object candidate = elementGetter.getElementOrNull(acc, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,27 @@ private void putToMap(Map<Object, Object> map, Object data) {

@Override
public Object retract(Object accumulator, Object retractField) {
// it's hard to mark the input is retracted without accumulator
if (accumulator == null) {
return null;
}

InternalMap acc = (InternalMap) accumulator;
// nothing to be retracted
if (retractField == null) {
return accumulator;
}
InternalMap retract = (InternalMap) retractField;
if (retract.size() == 0) {
return accumulator;
}

InternalArray retractKeyArray = retract.keyArray();
Set<Object> retractKeys = new HashSet<>();
for (int i = 0; i < retractKeyArray.size(); i++) {
retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i));
}

InternalMap acc = (InternalMap) accumulator;
Map<Object, Object> resultMap = new HashMap<>();
InternalArray accKeyArray = acc.keyArray();
InternalArray accValueArray = acc.valueArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,46 @@ public void testRetract(String changelogProducer, String mergeEngine) throws Exc
select.close();
}

@Test
public void testRetractInputNull() throws Exception {
sql(
"CREATE TABLE test_collect ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 ARRAY<STRING>,"
+ " f1 INT"
+ ") WITH ("
+ " 'changelog-producer' = 'lookup',"
+ " 'merge-engine' = 'partial-update',"
+ " 'fields.f0.aggregate-function' = 'collect',"
+ " 'fields.f1.sequence-group' = 'f0'"
+ ")");

List<Row> input =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, null, 1),
Row.ofKind(RowKind.INSERT, 1, new String[] {"A"}, 2),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"B"}, 3));
sEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE input ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 ARRAY<STRING>,"
+ " f1 INT"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'data-id' = '%s',"
+ " 'bounded' = 'true',"
+ " 'changelog-mode' = 'UB,UA'"
+ ")",
TestValuesTableFactory.registerData(input)))
.await();
sEnv.executeSql("INSERT INTO test_collect SELECT * FROM input").await();

assertThat(sql("SELECT * FROM test_collect"))
.containsExactly(Row.of(1, new String[] {"A", "B"}, 3));
}

private void checkOneRecord(Row row, int id, String... elements) {
assertThat(row.getField(0)).isEqualTo(id);
if (elements == null || elements.length == 0) {
Expand Down Expand Up @@ -1759,6 +1799,47 @@ public void testMergeMap() {
checkOneRecord(result.get(2), 3, toMap(1, "a", 2, "b", 3, "c"));
}

@Test
public void testRetractInputNull() throws Exception {
sql(
"CREATE TABLE test_merge_map1 ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 MAP<INT, STRING>,"
+ " f1 INT"
+ ") WITH ("
+ " 'changelog-producer' = 'lookup',"
+ " 'merge-engine' = 'partial-update',"
+ " 'fields.f0.aggregate-function' = 'merge_map',"
+ " 'fields.f1.sequence-group' = 'f0'"
+ ")");

List<Row> input =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, null, 1),
Row.ofKind(RowKind.INSERT, 1, Collections.singletonMap(1, "A"), 2),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
Row.ofKind(
RowKind.UPDATE_AFTER, 1, Collections.singletonMap(2, "B"), 3));
sEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE input ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 MAP<INT, STRING>,"
+ " f1 INT"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'data-id' = '%s',"
+ " 'bounded' = 'true',"
+ " 'changelog-mode' = 'UB,UA'"
+ ")",
TestValuesTableFactory.registerData(input)))
.await();
sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM input").await();

assertThat(sql("SELECT * FROM test_merge_map1"))
.containsExactly(Row.of(1, toMap(1, "A", 2, "B"), 3));
}

private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {
Expand Down
Loading