diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index c127bd4b8d0c0c..19d8dc13d75f70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -81,12 +81,14 @@ import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -247,6 +249,7 @@ protected void runPendingJob() throws AlterCancelException { try { BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + Map objectPool = new HashMap(); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); Partition partition = tbl.getPartition(partitionId); @@ -291,7 +294,7 @@ protected void runPendingJob() throws AlterCancelException { tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), tbl.isDynamicSchema(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); @@ -401,6 +404,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); + Map objectPool = new ConcurrentHashMap(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { @@ -479,7 +483,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), - whereClause); + whereClause, objectPool); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d8c12066df6250..d59a6d6bf9eb3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -72,10 +72,12 @@ import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -238,6 +240,7 @@ protected void runPendingJob() throws AlterCancelException { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); + Map objectPool = new HashMap(); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); if (partition == null) { @@ -286,7 +289,7 @@ protected void runPendingJob() throws AlterCancelException { tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), tbl.isDynamicSchema(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId) .get(shadowTabletId), originSchemaHash); @@ -410,7 +413,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - + Map objectPool = new ConcurrentHashMap(); try { Map indexColumnMap = Maps.newHashMap(); for (Map.Entry> entry : indexSchemaMap.entrySet()) { @@ -468,7 +471,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null); + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null, + objectPool); schemaChangeBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 42c69bba6e7e89..e3da9c70a543eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -54,6 +54,8 @@ public class DescriptorTable { private final HashMap outToIntermediateSlots = new HashMap<>(); + private TDescriptorTable thriftDescTable = null; // serialized version of this + public DescriptorTable() { } @@ -182,6 +184,9 @@ public void materializeIntermediateSlots() { } public TDescriptorTable toThrift() { + if (thriftDescTable != null) { + return thriftDescTable; + } TDescriptorTable result = new TDescriptorTable(); Map referencedTbls = Maps.newHashMap(); for (TupleDescriptor tupleD : tupleDescs.values()) { @@ -208,6 +213,7 @@ public TDescriptorTable toThrift() { for (TableIf tbl : referencedTbls.values()) { result.addToTableDescriptors(tbl.toThrift()); } + thriftDescTable = result; return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index bf55f539e69d28..5812679daaf15f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -97,6 +97,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -1035,6 +1036,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc } finally { localTbl.readUnlock(); } + Map objectPool = new HashMap(); for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); for (Tablet restoreTablet : restoredIdx.getTablets()) { @@ -1067,7 +1069,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.storeRowColumn(), localTbl.isDynamicSchema(), - binlogConfig); + binlogConfig, objectPool); task.setInRestoreMode(true); batchTask.addTask(task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6e167010ad7982..1b2090378d7725 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1866,6 +1866,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); TStorageMedium realStorageMedium = null; + Map objectPool = new HashMap(); for (Map.Entry entry : indexMap.entrySet()) { long indexId = entry.getKey(); MaterializedIndex index = entry.getValue(); @@ -1909,7 +1910,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long compactionPolicy, timeSeriesCompactionGoalSizeMbytes, timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds, timeSeriesCompactionEmptyRowsetsThreshold, timeSeriesCompactionLevelThreshold, - storeRowColumn, isDynamicSchema, binlogConfig); + storeRowColumn, isDynamicSchema, binlogConfig, objectPool); task.setStorageFormat(storageFormat); batchTask.addTask(task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 1941857743472a..13215503f6643a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -722,6 +722,7 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta long backendReportVersion) { AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + Map objectPool = new HashMap(); for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -827,7 +828,7 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), olapTable.getTimeSeriesCompactionLevelThreshold(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setIsRecoverTask(true); createReplicaBatchTask.addTask(createReplicaTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index 178550dd229017..2fa7f110bf53d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -54,6 +54,7 @@ public class AlterReplicaTask extends AgentTask { private Expr whereClause; private DescriptorTable descTable; private List baseSchemaColumns; + private Map objectPool; /** * AlterReplicaTask constructor. @@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable, List baseSchemaColumns, Expr whereClause) { + DescriptorTable descTable, List baseSchemaColumns, Expr whereClause, + Map objectPool) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -79,6 +81,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; + this.objectPool = objectPool; } public long getBaseTabletId() { @@ -125,27 +128,47 @@ public TAlterTabletReqV2 toThrift() { } if (defineExprs != null) { for (Map.Entry entry : defineExprs.entrySet()) { - List slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); - mvParam.setOriginColumnName(slots.get(0).getColumnName()); - mvParam.setMvExpr(entry.getValue().treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(entry.getKey()); + if (value == null) { + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setOriginColumnName(slots.get(0).getColumnName()); + mvParam.setMvExpr(entry.getValue().treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(entry.getKey(), mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } } if (whereClause != null) { - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); - mvParam.setMvExpr(whereClause.treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(Column.WHERE_SIGN); + if (value == null) { + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); + mvParam.setMvExpr(whereClause.treeToThrift()); + req.addToMaterializedViewParams(mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List columns = new ArrayList(); - for (Column column : baseSchemaColumns) { - columns.add(column.toThrift()); + Object value = objectPool.get(baseSchemaColumns); + if (value == null) { + List columns = new ArrayList(); + for (Column column : baseSchemaColumns) { + columns.add(column.toThrift()); + } + req.setColumns(columns); + objectPool.put(baseSchemaColumns, columns); + } else { + List columns = (List) value; + req.setColumns(columns); } - req.setColumns(columns); } return req; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 7388303f439afe..7ea5c951ca6615 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -121,6 +122,8 @@ public class CreateReplicaTask extends AgentTask { private BinlogConfig binlogConfig; + private Map objectPool; + public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, long replicaId, short shortKeyColumnCount, int schemaHash, long version, KeysType keysType, TStorageType storageType, @@ -143,7 +146,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition long timeSeriesCompactionLevelThreshold, boolean storeRowColumn, boolean isDynamicSchema, - BinlogConfig binlogConfig) { + BinlogConfig binlogConfig, + Map objectPool) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.replicaId = replicaId; @@ -188,6 +192,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold; this.storeRowColumn = storeRowColumn; this.binlogConfig = binlogConfig; + this.objectPool = objectPool; } public void setIsRecoverTask(boolean isRecoverTask) { @@ -248,21 +253,32 @@ public TCreateTabletReq toThrift() { int deleteSign = -1; int sequenceCol = -1; int versionCol = -1; - List tColumns = new ArrayList(); + List tColumns = null; + Object tCols = objectPool.get(columns); + if (tCols != null) { + tColumns = (List) tCols; + } else { + tColumns = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + TColumn tColumn = column.toThrift(); + // is bloom filter column + if (bfColumns != null && bfColumns.contains(column.getName())) { + tColumn.setIsBloomFilterColumn(true); + } + // when doing schema change, some modified column has a prefix in name. + // this prefix is only used in FE, not visible to BE, so we should remove this prefix. + if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { + tColumn.setColumnName( + column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length())); + } + tColumn.setVisible(column.isVisible()); + tColumns.add(tColumn); + } + objectPool.put(columns, tColumns); + } for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); - TColumn tColumn = column.toThrift(); - // is bloom filter column - if (bfColumns != null && bfColumns.contains(column.getName())) { - tColumn.setIsBloomFilterColumn(true); - } - // when doing schema change, some modified column has a prefix in name. - // this prefix is only used in FE, not visible to BE, so we should remove this prefix. - if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { - tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length())); - } - tColumn.setVisible(column.isVisible()); - tColumns.add(tColumn); if (column.isDeleteSignColumn()) { deleteSign = i; } @@ -279,9 +295,15 @@ public TCreateTabletReq toThrift() { tSchema.setVersionColIdx(versionCol); if (CollectionUtils.isNotEmpty(indexes)) { - List tIndexes = new ArrayList<>(); - for (Index index : indexes) { - tIndexes.add(index.toThrift()); + List tIndexes = null; + Object value = objectPool.get(indexes); + if (value != null) { + tIndexes = (List) value; + } else { + tIndexes = new ArrayList<>(); + for (Index index : indexes) { + tIndexes.add(index.toThrift()); + } } tSchema.setIndexes(tIndexes); storageFormat = TStorageFormat.V2; diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 8e89d7c72cbc9d..e9e2312d5ae6a8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -102,12 +102,12 @@ public void setUp() throws AnalysisException { range2 = Range.closedOpen(pk2, pk3); // create tasks - + Map objectPool = new HashMap(); // create createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null); + TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null, objectPool); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);