diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 60c02b6b5252b5..172805657ea8aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -86,6 +85,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -388,7 +388,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; @@ -401,14 +401,13 @@ protected void runWaitingTxnJob() throws AlterCancelException { // the rollup task will transform the data before visible version(included). long visibleVersion = partition.getVisibleVersion(); + Map defineExprs = Maps.newHashMap(); MaterializedIndex rollupIndex = entry.getValue(); Map tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId); for (Tablet rollupTablet : rollupIndex.getTablets()) { long rollupTabletId = rollupTablet.getId(); long baseTabletId = tabletIdMap.get(rollupTabletId); - Map defineExprs = Maps.newHashMap(); - DescriptorTable descTable = new DescriptorTable(); TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); Map descMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); @@ -470,7 +469,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), - tcloumnsPool, whereClause, expiration, vaultId); + objectPool, whereClause, expiration, vaultId); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f00177ec53746f..277a34115413b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -76,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -418,7 +418,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; @@ -478,12 +478,13 @@ protected void runWaitingTxnJob() throws AlterCancelException { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool, null, expiration, vaultId); schemaChangeBatchTask.addTask(rollupTask); } } } + } // end for partitions } finally { tbl.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index ebf505e454a68a..c95cc2670768c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -42,6 +42,7 @@ * The new replica can be a rollup replica, or a shadow replica of schema change. */ public class AlterReplicaTask extends AgentTask { + private long baseTabletId; private long newReplicaId; private int baseSchemaHash; @@ -53,7 +54,7 @@ public class AlterReplicaTask extends AgentTask { private Map defineExprs; private Expr whereClause; private DescriptorTable descTable; - private Map> tcloumnsPool; + private Map objectPool; private List baseSchemaColumns; private long expiration; @@ -67,7 +68,7 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable, List baseSchemaColumns, Map> tcloumnsPool, + DescriptorTable descTable, List baseSchemaColumns, Map objectPool, Expr whereClause, long expiration, String vaultId) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); @@ -85,7 +86,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; - this.tcloumnsPool = tcloumnsPool; + this.objectPool = objectPool; this.expiration = expiration; this.vaultId = vaultId; } @@ -134,32 +135,51 @@ public TAlterTabletReqV2 toThrift() { default: break; } + if (defineExprs != null) { for (Map.Entry entry : defineExprs.entrySet()) { - List slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); - mvParam.setMvExpr(entry.getValue().treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(entry.getKey()); + if (value == null) { + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setMvExpr(entry.getValue().treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(entry.getKey(), mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } } + if (whereClause != null) { - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); - mvParam.setMvExpr(whereClause.treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(Column.WHERE_SIGN); + if (value == null) { + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); + mvParam.setMvExpr(whereClause.treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(Column.WHERE_SIGN, mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List columns = tcloumnsPool.get(baseSchemaColumns); - if (columns == null) { - columns = new ArrayList(); + Object value = objectPool.get(baseSchemaColumns); + if (value == null) { + List columns = new ArrayList(); for (Column column : baseSchemaColumns) { columns.add(column.toThrift()); } - tcloumnsPool.put(baseSchemaColumns, columns); + objectPool.put(baseSchemaColumns, columns); + req.setColumns(columns); + } else { + List columns = (List) value; + req.setColumns(columns); } - req.setColumns(columns); } req.setStorageVaultId(this.vaultId); return req;