From 02dcfde1d7ba9f955628652f28ce15c233a9493c Mon Sep 17 00:00:00 2001 From: englefly Date: Tue, 19 Mar 2024 13:54:57 +0800 Subject: [PATCH 1/2] unify join node output project --- .../doris/job/extensions/mtmv/MTMVTask.java | 1 + .../translator/PhysicalPlanTranslator.java | 14 ++-- .../apache/doris/planner/HashJoinNode.java | 34 ++++---- .../apache/doris/planner/JoinNodeBase.java | 78 +++++++++---------- .../doris/planner/NestedLoopJoinNode.java | 30 ++++--- .../org/apache/doris/planner/PlanNode.java | 16 ++-- .../table_function/explode_json_array.groovy | 32 ++++---- 7 files changed, 102 insertions(+), 103 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index d7722e090d0240..499d1571d39c33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -222,6 +222,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionIds, executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setExecutor(executor); ctx.setQueryId(queryId); + ctx.getState().setNereids(true); command.run(ctx, executor); if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 4151f7d957a69d..d2f16f82dc2e39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1329,6 +1329,7 @@ public PlanFragment visitPhysicalHashJoin( null, null, null, hashJoin.isMarkJoin()); hashJoinNode.setNereidsId(hashJoin.getId()); hashJoinNode.setChildrenDistributeExprLists(distributeExprLists); + hashJoinNode.setUseSpecificProjections(false); PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin); if (JoinUtils.shouldColocateJoin(physicalHashJoin)) { @@ -1556,8 +1557,8 @@ public PlanFragment visitPhysicalHashJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - hashJoinNode.setvOutputTupleDesc(outputDescriptor); - hashJoinNode.setvSrcToOutputSMap(srcToOutput); + hashJoinNode.setOutputTupleDesc(outputDescriptor); + hashJoinNode.setProjectList(srcToOutput); } if (hashJoin.getStats() != null) { hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount()); @@ -1591,6 +1592,7 @@ public PlanFragment visitPhysicalNestedLoopJoin( NestedLoopJoinNode nestedLoopJoinNode = new NestedLoopJoinNode(context.nextPlanNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, tupleIds, JoinType.toJoinOperator(joinType), null, null, null, nestedLoopJoin.isMarkJoin()); + nestedLoopJoinNode.setUseSpecificProjections(true); nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId()); nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists); if (nestedLoopJoin.getStats() != null) { @@ -1737,8 +1739,8 @@ public PlanFragment visitPhysicalNestedLoopJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - nestedLoopJoinNode.setvOutputTupleDesc(outputDescriptor); - nestedLoopJoinNode.setvSrcToOutputSMap(srcToOutput); + nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor); + nestedLoopJoinNode.setProjectList(srcToOutput); } if (nestedLoopJoin.getStats() != null) { nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount()); @@ -1864,8 +1866,8 @@ public PlanFragment visitPhysicalProject(PhysicalProject project if (inputPlanNode instanceof JoinNodeBase) { TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode; - joinNode.setvOutputTupleDesc(tupleDescriptor); - joinNode.setvSrcToOutputSMap(projectionExprs); + joinNode.setOutputTupleDesc(tupleDescriptor); + joinNode.setProjectList(projectionExprs); // prune the hashOutputSlotIds if (joinNode instanceof HashJoinNode) { ((HashJoinNode) joinNode).getHashOutputSlotIds().clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index d8cc4a77a0ab07..c3cbf2afce15ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -171,7 +171,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -220,7 +220,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -800,26 +800,24 @@ protected void toThrift(TPlanNode msg) { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } - if (vSrcToOutputSMap != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - // TODO: Enable it after we support new optimizers - // if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { - // msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - // } else - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + if (useSpecificProjections) { + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null && outputTupleDesc != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } + if (outputTupleDesc != null) { + msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); } } - if (vOutputTupleDesc != null) { - msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); - // TODO Enable it after we support new optimizers - // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt()); - } + if (vIntermediateTupleDescList != null) { for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt()); } } msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift()); + msg.hash_join_node.setUseSpecificProjections(useSpecificProjections); } @Override @@ -862,9 +860,11 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve output.append(getRuntimeFilterExplainString(true)); } output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n"); - // todo unify in plan node - if (vOutputTupleDesc != null) { - output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()).append("\n"); + if (outputTupleDesc != null) { + output.append(detailPrefix).append("vec output tuple id: ").append(outputTupleDesc.getId()).append("\n"); + } + if (outputTupleDesc != null) { + output.append(detailPrefix).append("output tuple id: ").append(outputTupleDesc.getId()).append("\n"); } if (vIntermediateTupleDescList != null) { output.append(detailPrefix).append("vIntermediate tuple ids: "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index b635cfda59d992..fbe12b3d6a0405 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -44,7 +44,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,10 +56,14 @@ public abstract class JoinNodeBase extends PlanNode { protected final TableRef innerRef; protected final JoinOperator joinOp; protected final boolean isMark; - protected TupleDescriptor vOutputTupleDesc; protected ExprSubstitutionMap vSrcToOutputSMap; protected List vIntermediateTupleDescList; + // in thrift, every planNode denote its output tupelId and projections by output_tuple_id and projections + // but for legacy reasons, JoinNode has its specific representations: voutput_tuple_id and src_expr_list + // if useSpecificProjections true, set the output to its specific attributes. + protected boolean useSpecificProjections = true; + public JoinNodeBase(PlanNodeId id, String planNodeName, StatisticalType statisticalType, PlanNode outer, PlanNode inner, TableRef innerRef) { super(id, planNodeName, statisticalType); @@ -115,7 +118,7 @@ protected boolean isMaterializedByChild(SlotDescriptor slotDesc, ExprSubstitutio protected void computeOutputTuple(Analyzer analyzer) throws UserException { // 1. create new tuple - vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); boolean copyLeft = false; boolean copyRight = false; boolean leftNullable = false; @@ -169,7 +172,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(0) instanceof JoinNodeBase && analyzer.isOuterJoined(leftTupleDesc.getId()); for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, leftSlotDesc); if (leftNullable) { outputSlotDesc.setIsNullable(true); leftNullableNumber++; @@ -189,7 +192,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(1) instanceof JoinNodeBase && analyzer.isOuterJoined(rightTupleDesc.getId()); for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, rightSlotDesc); if (rightNullable) { outputSlotDesc.setIsNullable(true); rightNullableNumber++; @@ -206,7 +209,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) { SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0); SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, markSlot); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, markSlot); srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc)); } @@ -224,7 +227,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); // 3. add tupleisnull in null-side Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size()); // Condition1: the left child is null-side @@ -265,8 +268,8 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) { outputSlotIds = Lists.newArrayList(); List outputTupleDescList = Lists.newArrayList(); - if (vOutputTupleDesc != null) { - outputTupleDescList.add(vOutputTupleDesc); + if (outputTupleDesc != null) { + outputTupleDescList.add(outputTupleDesc); } else { for (TupleId tupleId : tupleIds) { outputTupleDescList.add(analyzer.getTupleDesc(tupleId)); @@ -296,13 +299,13 @@ public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) @Override public void projectOutputTuple() { - if (vOutputTupleDesc == null) { + if (outputTupleDesc == null) { return; } - if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) { + if (outputTupleDesc.getSlots().size() == outputSlotIds.size()) { return; } - Iterator iterator = vOutputTupleDesc.getSlots().iterator(); + Iterator iterator = outputTupleDesc.getSlots().iterator(); while (iterator.hasNext()) { SlotDescriptor slotDescriptor = iterator.next(); boolean keep = false; @@ -318,7 +321,7 @@ public void projectOutputTuple() { vSrcToOutputSMap.removeByRhsExpr(slotRef); } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } protected abstract Pair needToCopyRightAndLeft(); @@ -404,14 +407,14 @@ protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisExcept // 5. replace tuple is null expr TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap); - Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == vOutputTupleDesc.getSlots().size()); + Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == outputTupleDesc.getSlots().size()); List exprs = vSrcToOutputSMap.getLhs(); - ArrayList slots = vOutputTupleDesc.getSlots(); + ArrayList slots = outputTupleDesc.getSlots(); for (int i = 0; i < slots.size(); i++) { slots.get(i).setIsNullable(exprs.get(i).isNullable()); } vSrcToOutputSMap.reCalculateNullableInfoForSlotInRhs(); - vOutputTupleDesc.computeMemLayout(); + outputTupleDesc.computeMemLayout(); } protected abstract List computeSlotIdsForJoinConjuncts(Analyzer analyzer); @@ -479,7 +482,7 @@ public void init(Analyzer analyzer) throws UserException { /** * If parent wants to get join node tupleids, * it will call this function instead of read properties directly. - * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for join node. + * The reason is that the tuple id of outputTupleDesc the real output tuple id for join node. *

* If you read the properties of @tupleids directly instead of this function, * it reads the input id of the current node. @@ -487,16 +490,16 @@ public void init(Analyzer analyzer) throws UserException { @Override public ArrayList getTupleIds() { Preconditions.checkState(tupleIds != null); - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } return tupleIds; } @Override public ArrayList getOutputTblRefIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -513,8 +516,8 @@ public ArrayList getOutputTblRefIds() { @Override public List getOutputTupleIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -544,13 +547,6 @@ public int getNumInstances() { return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances()); } - /** - * Used by nereids. - */ - public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) { - this.vOutputTupleDesc = vOutputTupleDesc; - } - /** * Used by nereids. */ @@ -558,13 +554,6 @@ public void setvIntermediateTupleDescList(List vIntermediateTup this.vIntermediateTupleDescList = vIntermediateTupleDescList; } - /** - * Used by nereids. - */ - public void setvSrcToOutputSMap(List lhs) { - this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, Collections.emptyList()); - } - public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { outputSmap = smap; ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(Lists.newArrayList(vSrcToOutputSMap.getRhs()), @@ -572,12 +561,12 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { List newRhs = Lists.newArrayList(); boolean bSmapChanged = false; for (Expr rhsExpr : smap.getRhs()) { - if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(vOutputTupleDesc.getId())) { + if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(outputTupleDesc.getId())) { newRhs.add(rhsExpr); } else { // we need do project in the join node // add a new slot for projection result and add the project expr to vSrcToOutputSMap - SlotDescriptor slotDesc = analyzer.addSlotDescriptor(vOutputTupleDesc); + SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc); slotDesc.initFromExpr(rhsExpr); slotDesc.setIsMaterialized(true); // the project expr is from smap, which use the slots of hash join node's output tuple @@ -594,7 +583,16 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { if (bSmapChanged) { outputSmap.updateRhsExprs(newRhs); - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } } + + public void setUseSpecificProjections(boolean useSpecificProjections) { + this.useSpecificProjections = useSpecificProjections; + } + + + public boolean isUseSpecificProjections() { + return useSpecificProjections; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 9feac7e79ff7b1..983cbfd5884c69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -127,7 +127,7 @@ public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, List Date: Tue, 19 Mar 2024 18:06:17 +0800 Subject: [PATCH 2/2] fix-nest --- .../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index d2f16f82dc2e39..0ec312883c6b29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1592,7 +1592,7 @@ public PlanFragment visitPhysicalNestedLoopJoin( NestedLoopJoinNode nestedLoopJoinNode = new NestedLoopJoinNode(context.nextPlanNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, tupleIds, JoinType.toJoinOperator(joinType), null, null, null, nestedLoopJoin.isMarkJoin()); - nestedLoopJoinNode.setUseSpecificProjections(true); + nestedLoopJoinNode.setUseSpecificProjections(false); nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId()); nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists); if (nestedLoopJoin.getStats() != null) {