diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 8e9599f774bd12..32749fd8a774f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -247,10 +247,13 @@ private List createEtlIndexes(OlapTable table) throws LoadException { long indexId = entry.getKey(); int schemaHash = table.getSchemaHashByIndexId(indexId); + boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS) + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite(); + // columns List etlColumns = Lists.newArrayList(); for (Column column : entry.getValue()) { - etlColumns.add(createEtlColumn(column)); + etlColumns.add(createEtlColumn(column, changeAggType)); } // check distribution type @@ -290,7 +293,7 @@ private List createEtlIndexes(OlapTable table) throws LoadException { return etlIndexes; } - private EtlColumn createEtlColumn(Column column) { + private EtlColumn createEtlColumn(Column column, boolean changeAggType) { // column name String name = column.getName().toLowerCase(Locale.ROOT); // column type @@ -304,7 +307,11 @@ private EtlColumn createEtlColumn(Column column) { // aggregation type String aggregationType = null; if (column.getAggregationType() != null) { - aggregationType = column.getAggregationType().toString(); + if (changeAggType && !column.isKey()) { + aggregationType = AggregateType.REPLACE.toSql(); + } else { + aggregationType = column.getAggregationType().toString(); + } } // default value