From 4522619c2c7a87141a303a6bc1f9ee43aa991dc9 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 13 Jan 2025 19:08:31 +0800 Subject: [PATCH 1/3] [core] Fix that sequence group fields are mistakenly aggregated by default aggregator in partial update --- .../compact/PartialUpdateMergeFunction.java | 1 + .../paimon/flink/PartialUpdateITCase.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index ab25794129ba..ee4f1c06c7cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -184,6 +184,7 @@ private void updateWithSequenceGroup(KeyValue kv) { row.setField( fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value())); } + continue; } row.setField( i, aggregator == null ? field : aggregator.agg(accumulator, field)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index be2d6b34337c..c52fa42e2a81 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -723,4 +723,22 @@ public void testRemoveRecordOnDeleteLookup() throws Exception { Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache")); iterator.close(); } + + @Test + public void testSequenceGroupWithDefaultAgg() { + sql( + "CREATE TABLE seq_default_agg (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " seq INT," + + " v INT) WITH (" + + " 'merge-engine'='partial-update'," + + " 'fields.seq.sequence-group'='v'," + + " 'fields.default-aggregate-function'='sum'" + + ")"); + + sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)"); + sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)"); + + assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3)); + } } From 7a58b3da039a4be8058d0363a57a39e0577c85b2 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 13 Jan 2025 19:42:40 +0800 Subject: [PATCH 2/3] fix --- .../compact/PartialUpdateMergeFunction.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index ee4f1c06c7cb..fc1932f978d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -303,13 +303,14 @@ private Factory(Options options, RowType rowType, List primaryKeys) { this.sequenceGroupPartialDelete = new HashSet<>(); List fieldNames = rowType.getFieldNames(); + List sequenceFields = new ArrayList<>(); this.fieldSeqComparators = new HashMap<>(); Map sequenceGroupMap = new HashMap<>(); for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) { - List sequenceFields = + sequenceFields.addAll( Arrays.stream( k.substring( FIELDS_PREFIX.length() + 1, @@ -318,7 +319,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { - 1) .split(FIELDS_SEPARATOR)) .map(fieldName -> validateFieldName(fieldName, fieldNames)) - .collect(Collectors.toList()); + .collect(Collectors.toList())); Supplier userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true); @@ -348,7 +349,8 @@ private Factory(Options options, RowType rowType, List primaryKeys) { } } this.fieldAggregators = - createFieldAggregators(rowType, primaryKeys, new CoreOptions(options)); + createFieldAggregators( + rowType, primaryKeys, sequenceFields, new CoreOptions(options)); if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); @@ -515,7 +517,10 @@ private String validateFieldName(String fieldName, List fieldNames) { * @return The aggregators for each column. */ private Map> createFieldAggregators( - RowType rowType, List primaryKeys, CoreOptions options) { + RowType rowType, + List primaryKeys, + List sequenceFields, + CoreOptions options) { List fieldNames = rowType.getFieldNames(); List fieldTypes = rowType.getFieldTypes(); @@ -540,7 +545,8 @@ private Map> createFieldAggregators( isPrimaryKey, options, fieldName)); - } else if (defaultAggFunc != null) { + } else if (defaultAggFunc != null && !sequenceFields.contains(fieldName)) { + // no agg for sequence fields fieldAggregators.put( i, () -> From 9df56f2c025b603f8464171ead389c3cae73b5ca Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 14 Jan 2025 09:56:29 +0800 Subject: [PATCH 3/3] fix --- .../compact/PartialUpdateMergeFunction.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index fc1932f978d7..357461f74d70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -303,14 +303,14 @@ private Factory(Options options, RowType rowType, List primaryKeys) { this.sequenceGroupPartialDelete = new HashSet<>(); List fieldNames = rowType.getFieldNames(); - List sequenceFields = new ArrayList<>(); this.fieldSeqComparators = new HashMap<>(); Map sequenceGroupMap = new HashMap<>(); + List allSequenceFields = new ArrayList<>(); for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) { - sequenceFields.addAll( + List sequenceFields = Arrays.stream( k.substring( FIELDS_PREFIX.length() + 1, @@ -319,7 +319,8 @@ private Factory(Options options, RowType rowType, List primaryKeys) { - 1) .split(FIELDS_SEPARATOR)) .map(fieldName -> validateFieldName(fieldName, fieldNames)) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + allSequenceFields.addAll(sequenceFields); Supplier userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true); @@ -350,7 +351,7 @@ private Factory(Options options, RowType rowType, List primaryKeys) { } this.fieldAggregators = createFieldAggregators( - rowType, primaryKeys, sequenceFields, new CoreOptions(options)); + rowType, primaryKeys, allSequenceFields, new CoreOptions(options)); if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); @@ -519,7 +520,7 @@ private String validateFieldName(String fieldName, List fieldNames) { private Map> createFieldAggregators( RowType rowType, List primaryKeys, - List sequenceFields, + List allSequenceFields, CoreOptions options) { List fieldNames = rowType.getFieldNames(); @@ -545,7 +546,7 @@ private Map> createFieldAggregators( isPrimaryKey, options, fieldName)); - } else if (defaultAggFunc != null && !sequenceFields.contains(fieldName)) { + } else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) { // no agg for sequence fields fieldAggregators.put( i,