From 872fb9929af583fbcd7a3ee1e42fbd167a342e7f Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 20 Jan 2025 16:39:56 +0800 Subject: [PATCH] [core] Fix NPE when retracting collect and merge-map --- .../compact/aggregate/FieldCollectAgg.java | 10 ++- .../compact/aggregate/FieldMergeMapAgg.java | 10 ++- .../paimon/flink/PreAggregationITCase.java | 81 +++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java index afe5e05e70c0..d9e706f6e853 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java @@ -155,18 +155,26 @@ private boolean contains(List list, @Nullable Object element) { @Override public Object retract(Object accumulator, Object retractField) { + // it's hard to mark the input is retracted without accumulator if (accumulator == null) { return null; } - InternalArray acc = (InternalArray) accumulator; + // nothing to be retracted + if (retractField == null) { + return accumulator; + } InternalArray retract = (InternalArray) retractField; + if (retract.size() == 0) { + return accumulator; + } List retractedElements = new ArrayList<>(); for (int i = 0; i < retract.size(); i++) { retractedElements.add(elementGetter.getElementOrNull(retract, i)); } + InternalArray acc = (InternalArray) accumulator; List accElements = new ArrayList<>(); for (int i = 0; i < acc.size(); i++) { Object candidate = elementGetter.getElementOrNull(acc, i); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java index 9965339afd2b..487f20e3fd7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java @@ -69,12 +69,19 @@ private void putToMap(Map map, Object data) { @Override public Object retract(Object accumulator, Object retractField) { + // it's hard to mark the input is retracted without accumulator if (accumulator == null) { return null; } - InternalMap acc = (InternalMap) accumulator; + // nothing to be retracted + if (retractField == null) { + return accumulator; + } InternalMap retract = (InternalMap) retractField; + if (retract.size() == 0) { + return accumulator; + } InternalArray retractKeyArray = retract.keyArray(); Set retractKeys = new HashSet<>(); @@ -82,6 +89,7 @@ public Object retract(Object accumulator, Object retractField) { retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i)); } + InternalMap acc = (InternalMap) accumulator; Map resultMap = new HashMap<>(); InternalArray accKeyArray = acc.keyArray(); InternalArray accValueArray = acc.valueArray(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index b8dfd8f6a86e..954c1455d4cf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -1709,6 +1709,46 @@ public void testRetract(String changelogProducer, String mergeEngine) throws Exc select.close(); } + @Test + public void testRetractInputNull() throws Exception { + sql( + "CREATE TABLE test_collect (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY," + + " f1 INT" + + ") WITH (" + + " 'changelog-producer' = 'lookup'," + + " 'merge-engine' = 'partial-update'," + + " 'fields.f0.aggregate-function' = 'collect'," + + " 'fields.f1.sequence-group' = 'f0'" + + ")"); + + List input = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, null, 1), + Row.ofKind(RowKind.INSERT, 1, new String[] {"A"}, 2), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1), + Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"B"}, 3)); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE input (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY," + + " f1 INT" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '%s'," + + " 'bounded' = 'true'," + + " 'changelog-mode' = 'UB,UA'" + + ")", + TestValuesTableFactory.registerData(input))) + .await(); + sEnv.executeSql("INSERT INTO test_collect SELECT * FROM input").await(); + + assertThat(sql("SELECT * FROM test_collect")) + .containsExactly(Row.of(1, new String[] {"A", "B"}, 3)); + } + private void checkOneRecord(Row row, int id, String... elements) { assertThat(row.getField(0)).isEqualTo(id); if (elements == null || elements.length == 0) { @@ -1759,6 +1799,47 @@ public void testMergeMap() { checkOneRecord(result.get(2), 3, toMap(1, "a", 2, "b", 3, "c")); } + @Test + public void testRetractInputNull() throws Exception { + sql( + "CREATE TABLE test_merge_map1 (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 MAP," + + " f1 INT" + + ") WITH (" + + " 'changelog-producer' = 'lookup'," + + " 'merge-engine' = 'partial-update'," + + " 'fields.f0.aggregate-function' = 'merge_map'," + + " 'fields.f1.sequence-group' = 'f0'" + + ")"); + + List input = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, null, 1), + Row.ofKind(RowKind.INSERT, 1, Collections.singletonMap(1, "A"), 2), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1), + Row.ofKind( + RowKind.UPDATE_AFTER, 1, Collections.singletonMap(2, "B"), 3)); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE input (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 MAP," + + " f1 INT" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '%s'," + + " 'bounded' = 'true'," + + " 'changelog-mode' = 'UB,UA'" + + ")", + TestValuesTableFactory.registerData(input))) + .await(); + sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM input").await(); + + assertThat(sql("SELECT * FROM test_merge_map1")) + .containsExactly(Row.of(1, toMap(1, "A", 2, "B"), 3)); + } + private Map toMap(Object... kvs) { Map result = new HashMap<>(); for (int i = 0; i < kvs.length; i += 2) {