From 14f535d0b9d2291da722bbbe67545c60129b1136 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Sun, 31 Mar 2024 00:07:55 +0800 Subject: [PATCH 1/7] [fix](schema change) reduce memory usage of alter multi-column statements --- .../org/apache/doris/alter/RollupJobV2.java | 4 +- .../apache/doris/alter/SchemaChangeJobV2.java | 5 ++- .../apache/doris/task/AlterReplicaTask.java | 40 ++++++++++++------- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index d6c8ea694c521c..6aec6493452cbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -390,7 +390,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = Maps.newHashMap(); try { long expiration = (createTimeMs + timeoutMs) / 1000; Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); @@ -471,7 +471,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), - tcloumnsPool, whereClause, expiration); + objectPool, whereClause, expiration); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index fccc517f7a9ab9..da989966d1c6a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -420,7 +420,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = Maps.newHashMap(); try { long expiration = (createTimeMs + timeoutMs) / 1000; Map indexColumnMap = Maps.newHashMap(); @@ -484,12 +484,13 @@ protected void runWaitingTxnJob() throws AlterCancelException { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool, null, expiration); schemaChangeBatchTask.addTask(rollupTask); } } } + } // end for partitions } finally { tbl.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index 1c57fc5d0965d5..d9a98e96b756fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -53,7 +53,7 @@ public class AlterReplicaTask extends AgentTask { private Map defineExprs; private Expr whereClause; private DescriptorTable descTable; - private Map> tcloumnsPool; + private Map objectPool; private List baseSchemaColumns; private long expiration; @@ -65,7 +65,7 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable, List baseSchemaColumns, Map> tcloumnsPool, + DescriptorTable descTable, List baseSchemaColumns, Map objectPool, Expr whereClause, long expiration) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); @@ -83,7 +83,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; - this.tcloumnsPool = tcloumnsPool; + this.objectPool = objectPool; this.expiration = expiration; } @@ -132,12 +132,21 @@ public TAlterTabletReqV2 toThrift() { break; } if (defineExprs != null) { - for (Map.Entry entry : defineExprs.entrySet()) { - List slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); - mvParam.setMvExpr(entry.getValue().treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(defineExprs); + if (value == null) { + List params = new ArrayList(); + for (Map.Entry entry : defineExprs.entrySet()) { + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setMvExpr(entry.getValue().treeToThrift()); + params.add(mvParam); + } + objectPool.put(defineExprs, params); + req.setMaterializedViewParams(params); + } else { + List params = (List) value; + req.setMaterializedViewParams(params); } } if (whereClause != null) { @@ -148,15 +157,18 @@ public TAlterTabletReqV2 toThrift() { req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List columns = tcloumnsPool.get(baseSchemaColumns); - if (columns == null) { - columns = new ArrayList(); + Object value = objectPool.get(baseSchemaColumns); + if (value == null) { + List columns = new ArrayList(); for (Column column : baseSchemaColumns) { columns.add(column.toThrift()); } - tcloumnsPool.put(baseSchemaColumns, columns); + objectPool.put(baseSchemaColumns, columns); + req.setColumns(columns); + } else { + List columns = (List) value; + req.setColumns(columns); } - req.setColumns(columns); } return req; } From 5ab8378766710bf48abf633aab4d391120ac54cf Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Mon, 8 Apr 2024 00:04:37 +0800 Subject: [PATCH 2/7] fix --- fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java | 1 - .../src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java | 1 - 2 files changed, 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 991ad53e0e45ab..bb93bcbb55baaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 039a611afe4031..43d969000e1159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; From b2870a10158191b34e0bf06092b4aac55175a7b4 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Mon, 22 Apr 2024 14:42:45 +0800 Subject: [PATCH 3/7] fix --- .../src/main/java/org/apache/doris/alter/RollupJobV2.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index bb93bcbb55baaa..5f69deb1414caa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -402,14 +402,13 @@ protected void runWaitingTxnJob() throws AlterCancelException { // the rollup task will transform the data before visible version(included). long visibleVersion = partition.getVisibleVersion(); + Map defineExprs = Maps.newHashMap(); MaterializedIndex rollupIndex = entry.getValue(); Map tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId); for (Tablet rollupTablet : rollupIndex.getTablets()) { long rollupTabletId = rollupTablet.getId(); long baseTabletId = tabletIdMap.get(rollupTabletId); - Map defineExprs = Maps.newHashMap(); - DescriptorTable descTable = new DescriptorTable(); TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); Map descMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); From ea713c61483ce014973fd15795d47cba48671f3e Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Mon, 22 Apr 2024 22:09:24 +0800 Subject: [PATCH 4/7] fix --- .../src/main/java/org/apache/doris/alter/RollupJobV2.java | 4 +++- .../main/java/org/apache/doris/task/AlterReplicaTask.java | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 5f69deb1414caa..4ca7ca078955b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -85,6 +85,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -389,7 +390,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map objectPool = Maps.newHashMap(); + //Map objectPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index f23ceb98b81fcb..17b112327ce959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -30,10 +30,13 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; /* * This task is used for alter table process, such as rollup and schema change @@ -42,6 +45,9 @@ * The new replica can be a rollup replica, or a shadow replica of schema change. */ public class AlterReplicaTask extends AgentTask { + + private static final Logger LOG = LogManager.getLogger(AlterReplicaTask.class); + private long baseTabletId; private long newReplicaId; private int baseSchemaHash; @@ -137,7 +143,7 @@ public TAlterTabletReqV2 toThrift() { if (defineExprs != null) { Object value = objectPool.get(defineExprs); if (value == null) { - List params = new ArrayList(); + List params = new CopyOnWriteArrayList(); for (Map.Entry entry : defineExprs.entrySet()) { List slots = Lists.newArrayList(); entry.getValue().collect(SlotRef.class, slots); From 8afb6e08c564067b0ffa5eb6fba5c301c3c3bca4 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 23 Apr 2024 14:21:51 +0800 Subject: [PATCH 5/7] fix --- .../apache/doris/task/AlterReplicaTask.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index 17b112327ce959..e223f92c07ad84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; /* * This task is used for alter table process, such as rollup and schema change @@ -140,28 +139,35 @@ public TAlterTabletReqV2 toThrift() { default: break; } + if (defineExprs != null) { - Object value = objectPool.get(defineExprs); - if (value == null) { - List params = new CopyOnWriteArrayList(); - for (Map.Entry entry : defineExprs.entrySet()) { + for (Map.Entry entry : defineExprs.entrySet()) { + Object value = objectPool.get(entry.getKey()); + if (value == null) { List slots = Lists.newArrayList(); entry.getValue().collect(SlotRef.class, slots); TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); mvParam.setMvExpr(entry.getValue().treeToThrift()); - params.add(mvParam); + req.addToMaterializedViewParams(mvParam); + objectPool.put(entry.getKey(), mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); } - objectPool.put(defineExprs, params); - req.setMaterializedViewParams(params); - } else { - List params = (List) value; - req.setMaterializedViewParams(params); } } + if (whereClause != null) { - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); - mvParam.setMvExpr(whereClause.treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(Column.WHERE_SIGN); + if (value == null) { + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); + mvParam.setMvExpr(whereClause.treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(Column.WHERE_SIGN, mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } req.setDescTbl(descTable.toThrift()); From 36c2057449271c3a8aaab651a0f6da51e39893b8 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 23 Apr 2024 20:01:52 +0800 Subject: [PATCH 6/7] fix --- .../src/main/java/org/apache/doris/alter/RollupJobV2.java | 1 - .../main/java/org/apache/doris/alter/SchemaChangeJobV2.java | 2 +- .../src/main/java/org/apache/doris/task/AlterReplicaTask.java | 4 ---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 58305f0f623663..172805657ea8aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -388,7 +388,6 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - //Map objectPool = Maps.newHashMap(); Map objectPool = new ConcurrentHashMap(); String vaultId = tbl.getStorageVaultId(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 88943b2f95859c..b0010cf5b68638 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -417,7 +417,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { } tbl.readLock(); - Map objectPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index e223f92c07ad84..c95cc2670768c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -30,8 +30,6 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -45,8 +43,6 @@ */ public class AlterReplicaTask extends AgentTask { - private static final Logger LOG = LogManager.getLogger(AlterReplicaTask.class); - private long baseTabletId; private long newReplicaId; private int baseSchemaHash; From bd8d2f47832c08874ab79ef75aa04773bac6a613 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 23 Apr 2024 20:52:58 +0800 Subject: [PATCH 7/7] fix --- .../src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index b0010cf5b68638..277a34115413b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -75,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors;