diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index ab25794129ba..357461f74d70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -184,6 +184,7 @@ private void updateWithSequenceGroup(KeyValue kv) { row.setField( fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value())); } + continue; } row.setField( i, aggregator == null ? field : aggregator.agg(accumulator, field)); @@ -304,6 +305,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { List fieldNames = rowType.getFieldNames(); this.fieldSeqComparators = new HashMap<>(); Map sequenceGroupMap = new HashMap<>(); + List allSequenceFields = new ArrayList<>(); for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); @@ -318,6 +320,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { .split(FIELDS_SEPARATOR)) .map(fieldName -> validateFieldName(fieldName, fieldNames)) .collect(Collectors.toList()); + allSequenceFields.addAll(sequenceFields); Supplier userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true); @@ -347,7 +350,8 @@ private Factory(Options options, RowType rowType, List primaryKeys) { } } this.fieldAggregators = - createFieldAggregators(rowType, primaryKeys, new CoreOptions(options)); + createFieldAggregators( + rowType, primaryKeys, allSequenceFields, new CoreOptions(options)); if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); @@ -514,7 +518,10 @@ private String validateFieldName(String fieldName, List fieldNames) { * @return The aggregators for each column. */ private Map> createFieldAggregators( - RowType rowType, List primaryKeys, CoreOptions options) { + RowType rowType, + List primaryKeys, + List allSequenceFields, + CoreOptions options) { List fieldNames = rowType.getFieldNames(); List fieldTypes = rowType.getFieldTypes(); @@ -539,7 +546,8 @@ private Map> createFieldAggregators( isPrimaryKey, options, fieldName)); - } else if (defaultAggFunc != null) { + } else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) { + // no agg for sequence fields fieldAggregators.put( i, () -> diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index be2d6b34337c..c52fa42e2a81 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -723,4 +723,22 @@ public void testRemoveRecordOnDeleteLookup() throws Exception { Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache")); iterator.close(); } + + @Test + public void testSequenceGroupWithDefaultAgg() { + sql( + "CREATE TABLE seq_default_agg (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " seq INT," + + " v INT) WITH (" + + " 'merge-engine'='partial-update'," + + " 'fields.seq.sequence-group'='v'," + + " 'fields.default-aggregate-function'='sum'" + + ")"); + + sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)"); + sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)"); + + assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3)); + } }