From c254abb3d18eadc4aecfb23ce08ed1107edfc212 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 21 Jan 2025 23:33:47 +0800 Subject: [PATCH 1/2] [core] Fix that sequence fields are mistakenly aggregated by default aggregator in AggregateMergeFunction --- .../compact/PartialUpdateMergeFunction.java | 50 ++++++++++--------- .../aggregate/AggregateMergeFunction.java | 40 ++++++++++----- .../factory/FieldAggregatorFactory.java | 37 +++++++------- .../aggregate/FieldAggregatorTest.java | 6 +-- .../paimon/flink/PreAggregationITCase.java | 18 +++++++ 5 files changed, 92 insertions(+), 59 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 357461f74d70..a28ac52df41f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -54,6 +55,7 @@ import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE; import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP; import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update @@ -352,10 +354,6 @@ private Factory(Options options, RowType rowType, List primaryKeys) { this.fieldAggregators = createFieldAggregators( rowType, primaryKeys, allSequenceFields, new CoreOptions(options)); - if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) { - throw new IllegalArgumentException( - "Must use sequence group for aggregation functions."); - } removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); @@ -526,41 +524,47 @@ private Map> createFieldAggregators( List fieldNames = rowType.getFieldNames(); List fieldTypes = rowType.getFieldTypes(); Map> fieldAggregators = new HashMap<>(); - String defaultAggFunc = options.fieldsDefaultFunc(); for (int i = 0; i < fieldNames.size(); i++) { String fieldName = fieldNames.get(i); DataType fieldType = fieldTypes.get(i); - // aggregate by primary keys, so they do not aggregate - boolean isPrimaryKey = primaryKeys.contains(fieldName); - String strAggFunc = options.fieldAggFunc(fieldName); - boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName); - if (strAggFunc != null) { + if (allSequenceFields.contains(fieldName)) { + // no agg for sequence fields + continue; + } + + if (primaryKeys.contains(fieldName)) { + // aggregate by primary keys, so they do not aggregate fieldAggregators.put( i, () -> FieldAggregatorFactory.create( fieldType, - strAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName)); - } else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) { - // no agg for sequence fields + fieldName, + FieldPrimaryKeyAggFactory.NAME, + options)); + continue; + } + + String aggFuncName = getAggFuncName(options, fieldName); + if (aggFuncName != null) { + checkArgument( + !fieldSeqComparators.isEmpty(), + "Must use sequence group for aggregation functions."); fieldAggregators.put( i, () -> FieldAggregatorFactory.create( - fieldType, - defaultAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName)); + fieldType, fieldName, aggFuncName, options)); } } return fieldAggregators; } + + @Nullable + private String getAggFuncName(CoreOptions options, String fieldName) { + String aggFunc = options.fieldAggFunc(fieldName); + return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java index bad77ba91da5..ca380d2778e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java @@ -25,6 +25,8 @@ import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; @@ -132,27 +134,39 @@ public MergeFunction create(@Nullable int[][] projection) { } FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()]; - String defaultAggFunc = options.fieldsDefaultFunc(); + List sequenceFields = options.sequenceField(); for (int i = 0; i < fieldNames.size(); i++) { String fieldName = fieldNames.get(i); DataType fieldType = fieldTypes.get(i); - // aggregate by primary keys, so they do not aggregate - boolean isPrimaryKey = primaryKeys.contains(fieldName); - String strAggFunc = options.fieldAggFunc(fieldName); - strAggFunc = strAggFunc == null ? defaultAggFunc : strAggFunc; - boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName); + String aggFuncName = getAggFuncName(fieldName, sequenceFields); fieldAggregators[i] = - FieldAggregatorFactory.create( - fieldType, - strAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName); + FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options); } return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators); } + + private String getAggFuncName(String fieldName, List sequenceFields) { + if (sequenceFields.contains(fieldName)) { + // no agg for sequence fields, use last_non_null_value to do cover + return FieldLastNonNullValueAggFactory.NAME; + } + + if (primaryKeys.contains(fieldName)) { + // aggregate by primary keys, so they do not aggregate + return FieldPrimaryKeyAggFactory.NAME; + } + + String aggFuncName = options.fieldAggFunc(fieldName); + if (aggFuncName == null) { + aggFuncName = options.fieldsDefaultFunc(); + } + if (aggFuncName == null) { + // final default agg func + aggFuncName = FieldLastNonNullValueAggFactory.NAME; + } + return aggFuncName; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java index d2ce0e4760ad..a5d570dae345 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java @@ -27,6 +27,8 @@ import javax.annotation.Nullable; +import java.util.List; + /** Factory for {@link FieldAggregator}. */ public interface FieldAggregatorFactory extends Factory { @@ -35,37 +37,34 @@ public interface FieldAggregatorFactory extends Factory { String identifier(); static FieldAggregator create( - DataType fieldType, - @Nullable String strAgg, - boolean ignoreRetract, - boolean isPrimaryKey, - CoreOptions options, - String field) { - FieldAggregator fieldAggregator; - if (isPrimaryKey) { - strAgg = FieldPrimaryKeyAggFactory.NAME; - } else if (strAgg == null) { - strAgg = FieldLastNonNullValueAggFactory.NAME; - } - + DataType fieldType, String fieldName, String aggFuncName, CoreOptions options) { FieldAggregatorFactory fieldAggregatorFactory = FactoryUtil.discoverFactory( FieldAggregator.class.getClassLoader(), FieldAggregatorFactory.class, - strAgg); + aggFuncName); if (fieldAggregatorFactory == null) { throw new RuntimeException( String.format( "Use unsupported aggregation: %s or spell aggregate function incorrectly!", - strAgg)); + aggFuncName)); } - fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field); + FieldAggregator fieldAggregator = + fieldAggregatorFactory.create(fieldType, options, fieldName); + return options.fieldAggIgnoreRetract(fieldName) + ? new FieldIgnoreRetractAgg(fieldAggregator) + : fieldAggregator; + } - if (ignoreRetract) { - fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator); + @Nullable + static String getAggFuncName(String fieldName, List primaryKeys, CoreOptions options) { + if (primaryKeys.contains(fieldName)) { + // aggregate by primary keys, so they do not aggregate + return FieldPrimaryKeyAggFactory.NAME; } - return fieldAggregator; + String aggFunc = options.fieldAggFunc(fieldName); + return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index cf99a1157286..166b4479c83e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -936,10 +936,8 @@ public void testCustomAgg() throws IOException { FieldAggregatorFactory.create( DataTypes.STRING(), "custom", - false, - false, - CoreOptions.fromMap(new HashMap<>()), - "custom"); + "custom", + CoreOptions.fromMap(new HashMap<>())); Object agg = fieldAggregator.agg("test", "test"); assertThat(agg).isEqualTo("test"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index 954c1455d4cf..7b8ce3904e1f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -1195,6 +1195,24 @@ public void testMergeRead() { assertThat(batchSql("SELECT * FROM T where v = 1")) .containsExactlyInAnyOrder(Row.of(2, 1, 1)); } + + @Test + public void testSequenceFieldWithDefaultAgg() { + sql( + "CREATE TABLE seq_default_agg (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " seq INT," + + " v INT) WITH (" + + " 'merge-engine'='aggregation'," + + " 'sequence.field'='seq'," + + " 'fields.default-aggregate-function'='sum'" + + ")"); + + sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)"); + sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)"); + + assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3)); + } } /** ITCase for {@link FieldNestedUpdateAgg}. */ From 563e9574524afcdb8abca221296cf34760523ec7 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 21 Jan 2025 23:38:08 +0800 Subject: [PATCH 2/2] fix --- .../aggregate/factory/FieldAggregatorFactory.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java index a5d570dae345..480160991404 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java @@ -25,10 +25,6 @@ import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg; import org.apache.paimon.types.DataType; -import javax.annotation.Nullable; - -import java.util.List; - /** Factory for {@link FieldAggregator}. */ public interface FieldAggregatorFactory extends Factory { @@ -56,15 +52,4 @@ static FieldAggregator create( ? new FieldIgnoreRetractAgg(fieldAggregator) : fieldAggregator; } - - @Nullable - static String getAggFuncName(String fieldName, List primaryKeys, CoreOptions options) { - if (primaryKeys.contains(fieldName)) { - // aggregate by primary keys, so they do not aggregate - return FieldPrimaryKeyAggFactory.NAME; - } - - String aggFunc = options.fieldAggFunc(fieldName); - return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc; - } }