From b3675a332ad7cb3569f00d9833152ea19f1a58dc Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 17 Apr 2024 10:52:11 +0800 Subject: [PATCH] [fix](move-memtable) ignore single replica load when move memtable (#32845) Co-authored-by: Xin Liao --- .../doris/analysis/NativeInsertStmt.java | 7 +++-- .../doris/load/loadv2/LoadLoadingTask.java | 2 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 10 +++++-- .../doris/planner/StreamLoadPlanner.java | 28 +++++++++++-------- .../org/apache/doris/qe/SessionVariable.java | 8 ++++++ 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 524b7a90a427b6..3f948aefa75085 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1038,13 +1038,16 @@ private DataSink createDataSink() throws AnalysisException { } if (targetTable instanceof OlapTable) { OlapTableSink sink; + final boolean enableSingleReplicaLoad = + analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode() + ? false : analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(); if (isGroupCommitStreamLoadSql) { sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple, - targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(), + targetPartitionIds, enableSingleReplicaLoad, ConnectContext.get().getSessionVariable().getGroupCommit(), 0); } else { sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, - analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + enableSingleReplicaLoad); } dataSink = sink; sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 015190bb559dc0..d60c17233d7a08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -118,7 +118,7 @@ public void init(TUniqueId loadId, List> fileStatusList, this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, - this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink); + this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 7d0503f39c6680..c3215ae0e689a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -73,6 +73,7 @@ public class LoadingTaskPlanner { private final int sendBatchParallelism; private final boolean useNewLoadScanNode; private final boolean singleTabletLoadPerSink; + private final boolean enableMemtableOnSinkNode; private UserIdentity userInfo; // Something useful // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase() @@ -89,7 +90,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table BrokerDesc brokerDesc, List brokerFileGroups, boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo, - boolean singleTabletLoadPerSink) { + boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -103,8 +104,9 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; this.useNewLoadScanNode = useNewLoadScanNode; - this.singleTabletLoadPerSink = singleTabletLoadPerSink; this.userInfo = userInfo; + this.singleTabletLoadPerSink = singleTabletLoadPerSink; + this.enableMemtableOnSinkNode = enableMemtableOnSinkNode; if (Env.getCurrentEnv().getAccessManager() .checkDbPriv(userInfo, InternalCatalog.INTERNAL_CATALOG_NAME, Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(), @@ -206,8 +208,10 @@ public void plan(TUniqueId loadId, List> fileStatusesLis // 2. Olap table sink List partitionIds = getAllPartitionIds(); + final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode + ? false : Config.enable_single_replica_load; OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, - Config.enable_single_replica_load); + enableSingleReplicaLoad); olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index e8d0c52e3687e4..3b012b173ab4fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -261,15 +261,20 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde timeout *= 2; } + final boolean enableMemtableOnSinkNode = + destTable.getTableProperty().getUseSchemaLightChange() + ? taskInfo.isMemtableOnSinkNode() : false; + final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode + ? false : Config.enable_single_replica_load; // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(), taskInfo.getMaxFilterRatio()); } else { - olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); + olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad); } olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); @@ -326,10 +331,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde queryOptions.setBeExecVersion(Config.be_exec_version); queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); queryOptions.setEnableProfile(taskInfo.getEnableProfile()); - boolean isEnableMemtableOnSinkNode = - destTable.getTableProperty().getUseSchemaLightChange() - ? taskInfo.isMemtableOnSinkNode() : false; - queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode); params.setQueryOptions(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now())); @@ -493,15 +495,20 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns timeout *= 2; } + final boolean enableMemtableOnSinkNode = + destTable.getTableProperty().getUseSchemaLightChange() + ? taskInfo.isMemtableOnSinkNode() : false; + final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode + ? false : Config.enable_single_replica_load; // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(), taskInfo.getMaxFilterRatio()); } else { - olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); + olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad); } olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); @@ -560,10 +567,7 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns queryOptions.setBeExecVersion(Config.be_exec_version); queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); queryOptions.setEnableProfile(taskInfo.getEnableProfile()); - boolean isEnableMemtableOnSinkNode = - destTable.getTableProperty().getUseSchemaLightChange() - ? taskInfo.isMemtableOnSinkNode() : false; - queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode); pipParams.setQueryOptions(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index cd2c8d4750367f..cccff906dd7723 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3186,6 +3186,14 @@ public void setEnableSingleReplicaInsert(boolean enableSingleReplicaInsert) { this.enableSingleReplicaInsert = enableSingleReplicaInsert; } + public boolean isEnableMemtableOnSinkNode() { + return enableMemtableOnSinkNode; + } + + public void setEnableMemtableOnSinkNode(boolean enableMemtableOnSinkNode) { + this.enableMemtableOnSinkNode = enableMemtableOnSinkNode; + } + public boolean isEnableRuntimeFilterPrune() { return enableRuntimeFilterPrune; }