From 8fb3efe5ad79b7638f0de10346f5274728b6ecc8 Mon Sep 17 00:00:00 2001 From: lichi Date: Tue, 2 Dec 2025 16:55:01 +0800 Subject: [PATCH] branch-4.0: [fix](nereids)StatementContext must be created before creating stream load plan #58494 --- .../apache/doris/nereids/load/NereidsLoadUtils.java | 5 ++--- .../nereids/load/NereidsLoadingTaskPlanner.java | 9 +++++++++ .../nereids/load/NereidsStreamLoadPlanner.java | 11 +++++++++++ .../apache/doris/planner/StreamLoadPlannerTest.java | 13 +++++++++++++ 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java index bb4a480af98e1a..96a728ac1f8f85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.UserException; import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; @@ -199,8 +198,8 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par partitionNames != null ? partitionNames.getPartitionNames() : ImmutableList.of(), isPartialUpdate, partialUpdateNewKeyPolicy, DMLCommandType.LOAD, currentRootPlan); - CascadesContext cascadesContext = CascadesContext.initContext(new StatementContext(), currentRootPlan, - PhysicalProperties.ANY); + CascadesContext cascadesContext = CascadesContext.initContext(ConnectContext.get().getStatementContext(), + currentRootPlan, PhysicalProperties.ANY); ConnectContext ctx = cascadesContext.getConnectContext(); // we force convert nullable column to non-nullable column for load // so set feDebug to false to avoid AdjustNullableRule report error diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java index 877f58f5ec30ec..4998e22ca0f5ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java @@ -28,6 +28,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.FileLoadScanNode; @@ -146,6 +147,14 @@ public void plan(TUniqueId loadId, List> fileStatusesLis Preconditions.checkState(!fileGroups.isEmpty() && fileGroups.size() == fileStatusesList.size()); + // make sure StatementContext is set in ConnectContext + ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getStatementContext() == null) { + StatementContext statementContext = new StatementContext(); + connectContext.setStatementContext(statementContext); + statementContext.setConnectContext(connectContext); + } + PartitionNames partitionNames = getPartitionNames(); long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeoutS() : timeoutS; if (txnTimeout > Integer.MAX_VALUE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java index ce77cf3a7de2bc..676bd26f03902c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java @@ -31,6 +31,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.FileLoadScanNode; @@ -38,6 +39,7 @@ import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TBrokerFileStatus; @@ -217,6 +219,15 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde throw new DdlException("Column is not SUM AggregateType. column:" + col.getName()); } } + + // make sure StatementContext is set in ConnectContext + ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getStatementContext() == null) { + StatementContext statementContext = new StatementContext(); + connectContext.setStatementContext(statementContext); + statementContext.setConnectContext(connectContext); + } + // 1. create file group NereidsDataDescription dataDescription = new NereidsDataDescription(destTable.getName(), taskInfo); dataDescription.analyzeWithoutCheckPriv(db.getFullName()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java index 7097f37724c018..098e79b3511bb8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java @@ -17,8 +17,12 @@ package org.apache.doris.planner; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.load.NereidsLoadUtils; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.junit.Assert; import org.junit.Test; @@ -32,4 +36,13 @@ public void testParseStmt() throws Exception { List expressions = NereidsLoadUtils.parseExpressionSeq(sql); Assert.assertEquals(4, expressions.size()); } + + @Test + public void testExprIdGenerator() { + IdGenerator exprIdGenerator1 = StatementScopeIdGenerator.getExprIdGenerator(); + CascadesContext context = CascadesContext.initTempContext(); + IdGenerator exprIdGenerator2 = context.getStatementContext().getExprIdGenerator(); + // we get different IdGenerator instance + Assert.assertTrue(exprIdGenerator1 != exprIdGenerator2); + } }