From d44305661346d8d77d34030380ee32c6a26892db Mon Sep 17 00:00:00 2001 From: englefly Date: Tue, 12 Mar 2024 11:57:05 +0800 Subject: [PATCH 01/11] unify-join-proj --- .../translator/PhysicalPlanTranslator.java | 12 ++-- .../apache/doris/planner/HashJoinNode.java | 31 ++++----- .../apache/doris/planner/JoinNodeBase.java | 64 +++++++------------ .../doris/planner/NestedLoopJoinNode.java | 29 ++++----- .../suites/nereids_tpch_p0/tpch/cse.groovy | 38 +++++++++++ .../table_function/explode_json_array.groovy | 32 +++++----- 6 files changed, 111 insertions(+), 95 deletions(-) create mode 100644 regression-test/suites/nereids_tpch_p0/tpch/cse.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index e4b7eaed92450b..4753a34438989b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1508,8 +1508,8 @@ public PlanFragment visitPhysicalHashJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - hashJoinNode.setvOutputTupleDesc(outputDescriptor); - hashJoinNode.setvSrcToOutputSMap(srcToOutput); + hashJoinNode.setOutputTupleDesc(outputDescriptor); + hashJoinNode.setProjectList(srcToOutput); } if (hashJoin.getStats() != null) { hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount()); @@ -1689,8 +1689,8 @@ public PlanFragment visitPhysicalNestedLoopJoin( TupleDescriptor outputDescriptor = context.generateTupleDesc(); outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s)); - nestedLoopJoinNode.setvOutputTupleDesc(outputDescriptor); - nestedLoopJoinNode.setvSrcToOutputSMap(srcToOutput); + nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor); + nestedLoopJoinNode.setProjectList(srcToOutput); } if (nestedLoopJoin.getStats() != null) { nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount()); @@ -1816,8 +1816,8 @@ public PlanFragment visitPhysicalProject(PhysicalProject project if (inputPlanNode instanceof JoinNodeBase) { TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode; - joinNode.setvOutputTupleDesc(tupleDescriptor); - joinNode.setvSrcToOutputSMap(projectionExprs); + joinNode.setOutputTupleDesc(tupleDescriptor); + joinNode.setProjectList(projectionExprs); // prune the hashOutputSlotIds if (joinNode instanceof HashJoinNode) { ((HashJoinNode) joinNode).getHashOutputSlotIds().clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index d8cc4a77a0ab07..8b28ed419d50c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -41,6 +41,7 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.THashJoinNode; import org.apache.doris.thrift.TJoinDistributionType; import org.apache.doris.thrift.TPlanNode; @@ -171,7 +172,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -220,7 +221,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator nullableTupleIds.addAll(outer.getTupleIds()); } vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple); - vOutputTupleDesc = outputTuple; + this.outputTupleDesc = outputTuple; vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList()); } @@ -800,20 +801,18 @@ protected void toThrift(TPlanNode msg) { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } - if (vSrcToOutputSMap != null) { + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - // TODO: Enable it after we support new optimizers - // if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { - // msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - // } else - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + TExpr tExpr = vSrcToOutputSMap.getLhs().get(i).treeToThrift(); + msg.hash_join_node.addToSrcExprList(tExpr); + // make legacy planner compatible + msg.addToProjections(tExpr); } } - if (vOutputTupleDesc != null) { - msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); - // TODO Enable it after we support new optimizers - // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt()); + if (outputTupleDesc != null) { + msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); } + if (vIntermediateTupleDescList != null) { for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt()); @@ -862,9 +861,11 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve output.append(getRuntimeFilterExplainString(true)); } output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n"); - // todo unify in plan node - if (vOutputTupleDesc != null) { - output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()).append("\n"); + if (outputTupleDesc != null) { + output.append(detailPrefix).append("vec output tuple id: ").append(outputTupleDesc.getId()).append("\n"); + } + if (outputTupleDesc != null) { + output.append(detailPrefix).append("output tuple id: ").append(outputTupleDesc.getId()).append("\n"); } if (vIntermediateTupleDescList != null) { output.append(detailPrefix).append("vIntermediate tuple ids: "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index b635cfda59d992..65f7bca36524b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -44,7 +44,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,7 +56,6 @@ public abstract class JoinNodeBase extends PlanNode { protected final TableRef innerRef; protected final JoinOperator joinOp; protected final boolean isMark; - protected TupleDescriptor vOutputTupleDesc; protected ExprSubstitutionMap vSrcToOutputSMap; protected List vIntermediateTupleDescList; @@ -115,7 +113,7 @@ protected boolean isMaterializedByChild(SlotDescriptor slotDesc, ExprSubstitutio protected void computeOutputTuple(Analyzer analyzer) throws UserException { // 1. create new tuple - vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); boolean copyLeft = false; boolean copyRight = false; boolean leftNullable = false; @@ -169,7 +167,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(0) instanceof JoinNodeBase && analyzer.isOuterJoined(leftTupleDesc.getId()); for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, leftSlotDesc); if (leftNullable) { outputSlotDesc.setIsNullable(true); leftNullableNumber++; @@ -189,7 +187,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { getChild(1) instanceof JoinNodeBase && analyzer.isOuterJoined(rightTupleDesc.getId()); for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) { SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, rightSlotDesc); if (rightNullable) { outputSlotDesc.setIsNullable(true); rightNullableNumber++; @@ -206,7 +204,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) { SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0); SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, markSlot); + analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, markSlot); srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc)); } @@ -224,7 +222,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); // 3. add tupleisnull in null-side Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size()); // Condition1: the left child is null-side @@ -265,8 +263,8 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException { public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) { outputSlotIds = Lists.newArrayList(); List outputTupleDescList = Lists.newArrayList(); - if (vOutputTupleDesc != null) { - outputTupleDescList.add(vOutputTupleDesc); + if (outputTupleDesc != null) { + outputTupleDescList.add(outputTupleDesc); } else { for (TupleId tupleId : tupleIds) { outputTupleDescList.add(analyzer.getTupleDesc(tupleId)); @@ -296,13 +294,13 @@ public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) @Override public void projectOutputTuple() { - if (vOutputTupleDesc == null) { + if (outputTupleDesc == null) { return; } - if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) { + if (outputTupleDesc.getSlots().size() == outputSlotIds.size()) { return; } - Iterator iterator = vOutputTupleDesc.getSlots().iterator(); + Iterator iterator = outputTupleDesc.getSlots().iterator(); while (iterator.hasNext()) { SlotDescriptor slotDescriptor = iterator.next(); boolean keep = false; @@ -318,7 +316,7 @@ public void projectOutputTuple() { vSrcToOutputSMap.removeByRhsExpr(slotRef); } } - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } protected abstract Pair needToCopyRightAndLeft(); @@ -404,14 +402,14 @@ protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisExcept // 5. replace tuple is null expr TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap); - Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == vOutputTupleDesc.getSlots().size()); + Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == outputTupleDesc.getSlots().size()); List exprs = vSrcToOutputSMap.getLhs(); - ArrayList slots = vOutputTupleDesc.getSlots(); + ArrayList slots = outputTupleDesc.getSlots(); for (int i = 0; i < slots.size(); i++) { slots.get(i).setIsNullable(exprs.get(i).isNullable()); } vSrcToOutputSMap.reCalculateNullableInfoForSlotInRhs(); - vOutputTupleDesc.computeMemLayout(); + outputTupleDesc.computeMemLayout(); } protected abstract List computeSlotIdsForJoinConjuncts(Analyzer analyzer); @@ -479,7 +477,7 @@ public void init(Analyzer analyzer) throws UserException { /** * If parent wants to get join node tupleids, * it will call this function instead of read properties directly. - * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for join node. + * The reason is that the tuple id of outputTupleDesc the real output tuple id for join node. *

* If you read the properties of @tupleids directly instead of this function, * it reads the input id of the current node. @@ -487,16 +485,16 @@ public void init(Analyzer analyzer) throws UserException { @Override public ArrayList getTupleIds() { Preconditions.checkState(tupleIds != null); - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } return tupleIds; } @Override public ArrayList getOutputTblRefIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -513,8 +511,8 @@ public ArrayList getOutputTblRefIds() { @Override public List getOutputTupleIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); + if (outputTupleDesc != null) { + return Lists.newArrayList(outputTupleDesc.getId()); } switch (joinOp) { case LEFT_SEMI_JOIN: @@ -544,13 +542,6 @@ public int getNumInstances() { return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances()); } - /** - * Used by nereids. - */ - public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) { - this.vOutputTupleDesc = vOutputTupleDesc; - } - /** * Used by nereids. */ @@ -558,13 +549,6 @@ public void setvIntermediateTupleDescList(List vIntermediateTup this.vIntermediateTupleDescList = vIntermediateTupleDescList; } - /** - * Used by nereids. - */ - public void setvSrcToOutputSMap(List lhs) { - this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, Collections.emptyList()); - } - public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { outputSmap = smap; ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(Lists.newArrayList(vSrcToOutputSMap.getRhs()), @@ -572,12 +556,12 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { List newRhs = Lists.newArrayList(); boolean bSmapChanged = false; for (Expr rhsExpr : smap.getRhs()) { - if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(vOutputTupleDesc.getId())) { + if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(outputTupleDesc.getId())) { newRhs.add(rhsExpr); } else { // we need do project in the join node // add a new slot for projection result and add the project expr to vSrcToOutputSMap - SlotDescriptor slotDesc = analyzer.addSlotDescriptor(vOutputTupleDesc); + SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc); slotDesc.initFromExpr(rhsExpr); slotDesc.setIsMaterialized(true); // the project expr is from smap, which use the slots of hash join node's output tuple @@ -594,7 +578,7 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) { if (bSmapChanged) { outputSmap.updateRhsExprs(newRhs); - vOutputTupleDesc.computeStatAndMemLayout(); + outputTupleDesc.computeStatAndMemLayout(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 9feac7e79ff7b1..58f21246354583 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -28,6 +28,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TNestedLoopJoinNode; @@ -127,7 +128,7 @@ public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, List Date: Fri, 15 Mar 2024 17:48:18 +0800 Subject: [PATCH 02/11] unify join --- .../org/apache/doris/planner/HashJoinNode.java | 18 +++++++++--------- .../doris/planner/NestedLoopJoinNode.java | 7 ++++--- .../org/apache/doris/planner/PlanNode.java | 18 ++++++++++++------ 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 8b28ed419d50c1..d8167d437f0ce3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -38,6 +38,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; @@ -801,16 +802,15 @@ protected void toThrift(TPlanNode msg) { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } - if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - TExpr tExpr = vSrcToOutputSMap.getLhs().get(i).treeToThrift(); - msg.hash_join_node.addToSrcExprList(tExpr); - // make legacy planner compatible - msg.addToProjections(tExpr); + if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } + if (outputTupleDesc != null) { + msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); } - } - if (outputTupleDesc != null) { - msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); } if (vIntermediateTupleDescList != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 58f21246354583..29d579392b3d0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -188,14 +188,15 @@ protected void toThrift(TPlanNode msg) { msg.nested_loop_join_node.setIsMark(isMarkJoin()); - if (!ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { - // legacy planner use vSrcExprList - // while nereids use PlanNode.projections + if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { if (vSrcToOutputSMap != null) { for (int i = 0; i < vSrcToOutputSMap.size(); i++) { msg.nested_loop_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); } } + if (outputTupleDesc != null) { + msg.nested_loop_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt()); + } } if (vIntermediateTupleDescList != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index ee8965c18a5ed6..000df6c70f5fdf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.PlanStats; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsDeriveResult; @@ -648,13 +649,18 @@ private void treeToThriftHelper(TPlan container) { } toThrift(msg); container.addToNodes(msg); - if (projectList != null) { - for (Expr expr : projectList) { - msg.addToProjections(expr.treeToThrift()); + + // legacy planner set outputTuple and projections inside join node + if (ConnectContext.get() != null && ConnectContext.get().getState().isNereids() + || !(this instanceof JoinNodeBase)) { + if (outputTupleDesc != null) { + msg.setOutputTupleId(outputTupleDesc.getId().asInt()); + } + if (projectList != null) { + for (Expr expr : projectList) { + msg.addToProjections(expr.treeToThrift()); + } } - } - if (outputTupleDesc != null) { - msg.setOutputTupleId(outputTupleDesc.getId().asInt()); } if (this instanceof ExchangeNode) { msg.num_children = 0; From 984da00a34899a4739371b377cebcfeadab31f90 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 12 Mar 2024 21:46:10 +0800 Subject: [PATCH 03/11] upd --- be/src/exec/exec_node.cpp | 26 +++++++++++----- be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/join_probe_operator.cpp | 20 ++++++++++--- be/src/pipeline/exec/join_probe_operator.h | 7 ++++- .../exec/nested_loop_join_probe_operator.cpp | 4 ++- .../exec/nested_loop_join_probe_operator.h | 2 +- be/src/pipeline/pipeline_x/operator.cpp | 30 +++++++++++++------ be/src/pipeline/pipeline_x/operator.h | 2 ++ be/src/vec/exec/join/vjoin_node_base.cpp | 20 ++++++++++--- be/src/vec/exec/join/vjoin_node_base.h | 7 ++++- .../vec/exec/join/vnested_loop_join_node.cpp | 1 + be/src/vec/exec/join/vnested_loop_join_node.h | 2 +- 12 files changed, 94 insertions(+), 29 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 368e94562a264f..ed032d0976700e 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -514,6 +514,24 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_projection_timer); + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1dd8979f5b36af..05e32528e9e58d 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -325,6 +325,8 @@ class ExecNode { std::shared_ptr _query_statistics = nullptr; + bool _keep_origin = false; + private: static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, const std::vector& tnodes, diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 03b20fdb4d4d35..814d7c0fbf628e 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -82,6 +82,14 @@ template Status JoinProbeLocalState::_build_output_block( vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) { auto& p = Base::_parent->template cast(); + if (!Base::_projections.empty()) { + *output_block = *origin_block; + if (p._is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); vectorized::MutableBlock mutable_block = @@ -197,14 +205,18 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } } else if (tnode.__isset.nested_loop_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!Base::_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } } else { // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 9bb716ff36dfe3..be68e547de0f68 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; - [[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; } + [[nodiscard]] const RowDescriptor& row_desc() const override { + if (Base::_output_row_descriptor) { + return *Base::_output_row_descriptor; + } + return *_output_row_desc; + } [[nodiscard]] const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 9272418ca0ad60..35d5bc6085ee41 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -436,7 +436,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), - _old_version_flag(!tnode.__isset.nested_loop_join_node) {} + _old_version_flag(!tnode.__isset.nested_loop_join_node) { + _keep_origin = _is_output_left_side_only; +} Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 770289f397f2ba..7a8be87d922b90 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 0c890b83041cc8..b4b66b1e7ff590 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -171,9 +171,28 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { - auto local_state = state->get_local_state(operator_id()); + auto* local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); + + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (_keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } + } else { + if (_keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } + } + }; + using namespace vectorized; vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, @@ -188,14 +207,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id)); auto column_ptr = origin_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + insert_column_datas(mutable_columns[i], column_ptr, rows); } DCHECK(mutable_block.rows() == rows); output_block->set_columns(std::move(mutable_columns)); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 06ba93a36f3832..18395229f9117b 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -326,6 +326,8 @@ class OperatorXBase : public OperatorBase { std::string _op_name; bool _ignore_data_distribution = false; int _parallel_tasks = 0; + + bool _keep_origin = false; }; template diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 6fab6b8b91f759..79ca34e8326d9b 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -95,14 +95,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des } if (tnode.__isset.hash_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset( + new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size()))); } else if (tnode.__isset.nested_loop_join_node) { - _output_row_desc.reset( - new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + if (!_output_row_descriptor) { + _output_row_desc.reset(new RowDescriptor( + descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false})); + } _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, std::vector(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size()))); @@ -166,6 +170,14 @@ void VJoinNodeBase::_construct_mutable_join_block() { Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, bool keep_origin) { SCOPED_TIMER(_build_output_block_timer); + if (!_projections.empty()) { + *output_block = *origin_block; + if (_is_outer_join) { + DCHECK(output_block->columns() >= 2); + output_block->erase_tail(output_block->columns() - 2); + } + return Status::OK(); + } auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = is_mem_reuse diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 0e6ac3c9837db8..449d58b6fb14da 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode { Status open(RuntimeState* state) override; - const RowDescriptor& row_desc() const override { return *_output_row_desc; } + const RowDescriptor& row_desc() const override { + if (_output_row_descriptor) { + return *_output_row_descriptor; + } + return *_output_row_desc; + } const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 3548680bf4998e..77eef4d4da5a00 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) { _is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only; + _keep_origin = _is_output_left_side_only; } if (tnode.nested_loop_join_node.__isset.join_conjuncts && diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index fd31b651bd3a13..18bc901222f3fb 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { const RowDescriptor& row_desc() const override { return _old_version_flag ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) - : *_output_row_desc; + : (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc); } std::shared_ptr get_left_block() { return _left_block; } From 857203fc771112744989ba449db4927fd9ed27a8 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Fri, 15 Mar 2024 18:27:21 +0800 Subject: [PATCH 04/11] format --- .../src/main/java/org/apache/doris/planner/HashJoinNode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index d8167d437f0ce3..e1385231254c23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -42,7 +42,6 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.THashJoinNode; import org.apache.doris.thrift.TJoinDistributionType; import org.apache.doris.thrift.TPlanNode; From 61935371cf84a34eca1f6598814e38019b1a5b58 Mon Sep 17 00:00:00 2001 From: englefly Date: Sat, 16 Mar 2024 23:23:42 +0800 Subject: [PATCH 05/11] vSrcToOutputSMap is empty in some cases --- .../apache/doris/planner/HashJoinNode.java | 19 ++++++++++++++++--- .../doris/planner/NestedLoopJoinNode.java | 19 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index e1385231254c23..4dfebb4daa3edd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -802,9 +802,22 @@ protected void toThrift(TPlanNode msg) { } } if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { - if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null + && outputTupleDesc != null) { + List lhs = vSrcToOutputSMap.getLhs(); + if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { + boolean match = true; + for (int i=0 ; i< vSrcToOutputSMap.size(); i++) { + if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { + match = false; + break; + } + } + if (match) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } } } if (outputTupleDesc != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 29d579392b3d0b..ccaa97e44ee8a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -189,9 +189,22 @@ protected void toThrift(TPlanNode msg) { msg.nested_loop_join_node.setIsMark(isMarkJoin()); if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { - if (vSrcToOutputSMap != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - msg.nested_loop_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null + && outputTupleDesc != null) { + List lhs = vSrcToOutputSMap.getLhs(); + if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { + boolean match = true; + for (int i=0 ; i< vSrcToOutputSMap.size(); i++) { + if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { + match = false; + break; + } + } + if (match) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.nested_loop_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } } } if (outputTupleDesc != null) { From 17970c04b1ef4a91199bbe13309ecd78782d117e Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Sat, 16 Mar 2024 23:29:00 +0800 Subject: [PATCH 06/11] add debug log --- be/src/pipeline/exec/join_probe_operator.cpp | 42 +++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 814d7c0fbf628e..1f5d56359b815a 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -17,6 +17,11 @@ #include "join_probe_operator.h" +#include + +#include "common/exception.h" +#include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/operator.h" @@ -104,7 +109,7 @@ Status JoinProbeLocalState::_build_output_block( auto insert_column_datas = [keep_origin](auto& to, vectorized::ColumnPtr& from, size_t rows) { if (to->is_nullable() && !from->is_nullable()) { if (keep_origin || !from->is_exclusive()) { - auto& null_column = reinterpret_cast(*to); + auto& null_column = assert_cast(*to); null_column.get_nested_column().insert_range_from(*from, 0, rows); null_column.get_null_map_column().get_data().resize_fill(rows, 0); } else { @@ -118,6 +123,35 @@ Status JoinProbeLocalState::_build_output_block( } } }; + + LOG_WARNING("yxc test empty copy"); + + auto print = [](vectorized::VExprContextSPtrs& ctxs) { + std::string s = "[ "; + for (auto& ctx : ctxs) { + s += ctx->root()->debug_string() + "\t\t\t"; + } + return s + " ]"; + }; + + auto print_desc = [](std::unique_ptr& desc) { + std::string ret; + if (!desc) { + ret = "is null"; + } else { + ret = desc->debug_string(); + } + return ret; + }; + + LOG_WARNING("yxc test ") + .tag("Base::_projections", print(Base::_projections)) + .tag("_output_expr_ctxs", print(_output_expr_ctxs)) + .tag("output_block", mutable_block.dump_names() + " " + mutable_block.dump_types()) + .tag("orgin block ", origin_block->dump_structure()) + .tag("_intermediate_row_desc", print_desc(p._intermediate_row_desc)) + .tag("plan node", print_desc(p._output_row_descriptor)) + .tag("join node", print_desc(p._output_row_desc)); if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); if (_output_expr_ctxs.empty()) { @@ -208,7 +242,13 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T if (!Base::_output_row_descriptor) { _output_row_desc.reset( new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); + } else { + if (tnode.hash_join_node.__isset.voutput_tuple_id) { + throw Exception {ErrorCode::INTERNAL_ERROR, + "yxc test error .__isset.voutput_tuple_id"}; + } } + } else if (tnode.__isset.nested_loop_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list, From bf9db3a2f2e9a2b223a167e80ac223edc58013a7 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Sat, 16 Mar 2024 23:36:26 +0800 Subject: [PATCH 07/11] format --- .../apache/doris/planner/HashJoinNode.java | 103 ++++++++++-------- .../doris/planner/NestedLoopJoinNode.java | 34 +++--- 2 files changed, 78 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 4dfebb4daa3edd..b6db0a2fcda707 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -80,10 +80,10 @@ public class HashJoinNode extends JoinNodeBase { private List markJoinConjuncts; private DistributionMode distrMode; - private boolean isColocate = false; //the flag for colocate join + private boolean isColocate = false; // the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here - private Set hashOutputSlotIds = Sets.newHashSet(); //init for nereids + private Set hashOutputSlotIds = Sets.newHashSet(); // init for nereids private Map hashOutputExprSlotIdMap = Maps.newHashMap(); @@ -251,12 +251,15 @@ public void setColocate(boolean colocate, String reason) { } /** - * Calculate the slots output after going through the hash table in the hash join node. - * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that + * Calculate the slots output after going through the hash table in the hash + * join node. + * The most essential difference between 'hashOutputSlots' and 'outputSlots' is + * that * it's output needs to contain other conjunct and conjunct columns. * hash output slots = output slots + conjunct slots + other conjunct slots * For example: - * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where a.k2>1; + * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where + * a.k2>1; * output slots: b.k1 * other conjuncts: a.k2>1 * conjuncts: b.k2>1 @@ -328,18 +331,16 @@ public void init(Analyzer analyzer) throws UserException { ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); List newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); - eqJoinConjuncts = - newEqJoinConjuncts.stream().map(entity -> { - BinaryPredicate predicate = (BinaryPredicate) entity; - if (predicate.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) { - Preconditions.checkArgument(predicate.getChildren().size() == 2); - if (!predicate.getChild(0).isNullable() || !predicate.getChild(1).isNullable()) { - predicate.setOp(BinaryPredicate.Operator.EQ); - } - } - return predicate; - } - ).collect(Collectors.toList()); + eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> { + BinaryPredicate predicate = (BinaryPredicate) entity; + if (predicate.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) { + Preconditions.checkArgument(predicate.getChildren().size() == 2); + if (!predicate.getChild(0).isNullable() || !predicate.getChild(1).isNullable()) { + predicate.setOp(BinaryPredicate.Operator.EQ); + } + } + return predicate; + }).collect(Collectors.toList()); otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); computeOutputTuple(analyzer); @@ -384,7 +385,8 @@ private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, SlotDes this.rhs = rhs; } - // Convenience functions. They return double to avoid excessive casts in callers. + // Convenience functions. They return double to avoid excessive casts in + // callers. public double lhsNdv() { // return the estimated number of rows in this partition (-1 if unknown) return Math.min(lhs.getStats().getNumDistinctValues(), lhsNumRows()); @@ -415,8 +417,10 @@ public TupleId rhsTid() { } /** - * Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or null if - * the given conjunct is not of the form = or if the underlying + * Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or + * null if + * the given conjunct is not of the form = or if the + * underlying * table/column of at least one side is missing stats. */ public static EqJoinConjunctScanSlots create(Expr eqJoinConjunct) { @@ -478,7 +482,8 @@ private long getJoinCardinality() { return lhsCard; } - // Collect join conjuncts that are eligible to participate in cardinality estimation. + // Collect join conjuncts that are eligible to participate in cardinality + // estimation. List eqJoinConjunctSlots = new ArrayList<>(); for (Expr eqJoinConjunct : eqJoinConjuncts) { EqJoinConjunctScanSlots slots = EqJoinConjunctScanSlots.create(eqJoinConjunct); @@ -496,10 +501,13 @@ private long getJoinCardinality() { } /** - * Returns the estimated join cardinality of a generic N:M inner or outer join based - * on the given list of equi-join conjunct slots and the join input cardinalities. + * Returns the estimated join cardinality of a generic N:M inner or outer join + * based + * on the given list of equi-join conjunct slots and the join input + * cardinalities. * The returned result is >= 0. - * The list of join conjuncts must be non-empty and the cardinalities must be >= 0. + * The list of join conjuncts must be non-empty and the cardinalities must be >= + * 0. *

* Generic estimation: * cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d)) @@ -518,7 +526,8 @@ private long getGenericJoinCardinality(List eqJoinConju long result = -1; for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) { - // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs + // Adjust the NDVs on both sides to account for predicates. Intuitively, the + // NDVs // should only decrease. We ignore adjustments that would lead to an increase. double lhsAdjNdv = slots.lhsNdv(); if (slots.lhsNumRows() > lhsCard) { @@ -542,7 +551,6 @@ private long getGenericJoinCardinality(List eqJoinConju return result; } - @Override public void computeStats(Analyzer analyzer) throws UserException { super.computeStats(analyzer); @@ -553,25 +561,28 @@ public void computeStats(Analyzer analyzer) throws UserException { @Override protected void computeOldCardinality() { - // For a join between child(0) and child(1), we look for join conditions "L.c = R.d" + // For a join between child(0) and child(1), we look for join conditions "L.c = + // R.d" // (with L being from child(0) and R from child(1)) and use as the cardinality // estimate the maximum of - // child(0).cardinality * R.cardinality / # distinct values for R.d - // * child(1).cardinality / R.cardinality + // child(0).cardinality * R.cardinality / # distinct values for R.d + // * child(1).cardinality / R.cardinality // across all suitable join conditions, which simplifies to - // child(0).cardinality * child(1).cardinality / # distinct values for R.d + // child(0).cardinality * child(1).cardinality / # distinct values for R.d // The reasoning is that // - each row in child(0) joins with R.cardinality/#DV_R.d rows in R // - each row in R is 'present' in child(1).cardinality / R.cardinality rows in - // child(1) + // child(1) // // This handles the very frequent case of a fact table/dimension table join - // (aka foreign key/primary key join) if the primary key is a single column, with + // (aka foreign key/primary key join) if the primary key is a single column, + // with // possible additional predicates against the dimension table. An example: - // FROM FactTbl F JOIN Customers C D ON (F.cust_id = C.id) ... WHERE C.region = 'US' + // FROM FactTbl F JOIN Customers C D ON (F.cust_id = C.id) ... WHERE C.region = + // 'US' // - if there are 5 regions, the selectivity of "C.region = 'US'" would be 0.2 - // and the output cardinality of the Customers scan would be 0.2 * # rows in - // Customers + // and the output cardinality of the Customers scan would be 0.2 * # rows in + // Customers // - # rows in Customers == # of distinct values for Customers.id // - the output cardinality of the join would be F.cardinality * 0.2 @@ -596,12 +607,12 @@ protected void computeOldCardinality() { } long numDistinct = stats.getNumDistinctValues(); // TODO rownum - //Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable(); + // Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable(); // if (rhsTbl != null && rhsTbl.getNumRows() != -1) { // we can't have more distinct values than rows in the table, even though // the metastore stats may think so // LOG.info( - // "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows())); + // "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows())); // numDistinct = Math.min(numDistinct, rhsTbl.getNumRows()); // } maxNumDistinct = Math.max(maxNumDistinct, numDistinct); @@ -650,7 +661,8 @@ private long getNdv(Expr expr) { /** * Returns the estimated cardinality of a semi join node. * For a left semi join between child(0) and child(1), we look for equality join - * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as + * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and + * use as * the cardinality estimate the minimum of * |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c) * over all suitable join conditions. The reasoning is that: @@ -666,7 +678,8 @@ private long getNdv(Expr expr) { * in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c) * - otherwise, we conservatively use |L| to avoid underestimation *

- * We analogously estimate the cardinality for right semi/anti joins, and treat the + * We analogously estimate the cardinality for right semi/anti joins, and treat + * the * null-aware anti join like a regular anti join */ private long getSemiJoinCardinality() { @@ -753,7 +766,7 @@ public void getMaterializedIds(Analyzer analyzer, List ids) { } } - //nereids only + // nereids only public void addSlotIdToHashOutputSlotIds(SlotId slotId) { hashOutputSlotIds.add(slotId); } @@ -803,11 +816,11 @@ protected void toThrift(TPlanNode msg) { } if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null - && outputTupleDesc != null) { + && outputTupleDesc != null) { List lhs = vSrcToOutputSMap.getLhs(); if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { boolean match = true; - for (int i=0 ; i< vSrcToOutputSMap.size(); i++) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { match = false; break; @@ -841,9 +854,9 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } else { distrModeStr = distrMode.toString(); } - StringBuilder output = - new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(") - .append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n"); + StringBuilder output = new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()) + .append("(") + .append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n"); if (detailLevel == TExplainLevel.BRIEF) { output.append(detailPrefix).append( String.format("cardinality=%,d", cardinality)).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index ccaa97e44ee8a6..5d3cd437d60f93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -49,18 +49,24 @@ public class NestedLoopJoinNode extends JoinNodeBase { private static final Logger LOG = LogManager.getLogger(NestedLoopJoinNode.class); - // If isOutputLeftSideOnly=true, the data from the left table is returned directly without a join operation. - // This is used to optimize `in bitmap`, because bitmap will make a lot of copies when doing Nested Loop Join, + // If isOutputLeftSideOnly=true, the data from the left table is returned + // directly without a join operation. + // This is used to optimize `in bitmap`, because bitmap will make a lot of + // copies when doing Nested Loop Join, // which is very resource intensive. // `in bitmap` has two cases: // 1. select * from tbl1 where k1 in (select bitmap_col from tbl2); - // This will generate a bitmap runtime filter to filter the left table, because the bitmap is an exact filter - // and does not need to be filtered again in the NestedLoopJoinNode, so it returns the left table data directly. + // This will generate a bitmap runtime filter to filter the left table, because + // the bitmap is an exact filter + // and does not need to be filtered again in the NestedLoopJoinNode, so it + // returns the left table data directly. // 2. select * from tbl1 where 1 in (select bitmap_col from tbl2); - // This sql will be rewritten to - // "select * from tbl1 left semi join tbl2 where bitmap_contains(tbl2.bitmap_col, 1);" - // return all data in the left table to parent node when there is data on the build side, and return empty when - // there is no data on the build side. + // This sql will be rewritten to + // "select * from tbl1 left semi join tbl2 where + // bitmap_contains(tbl2.bitmap_col, 1);" + // return all data in the left table to parent node when there is data on the + // build side, and return empty when + // there is no data on the build side. private boolean isOutputLeftSideOnly = false; private List runtimeFilterExpr = Lists.newArrayList(); @@ -190,11 +196,11 @@ protected void toThrift(TPlanNode msg) { if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null - && outputTupleDesc != null) { - List lhs = vSrcToOutputSMap.getLhs(); + && outputTupleDesc != null) { + List lhs = vSrcToOutputSMap.getLhs(); if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { boolean match = true; - for (int i=0 ; i< vSrcToOutputSMap.size(); i++) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { match = false; break; @@ -242,9 +248,9 @@ private void computeCrossRuntimeFilterExpr() { @Override public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) { String distrModeStr = ""; - StringBuilder output = - new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(") - .append(distrModeStr).append(")\n"); + StringBuilder output = new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()) + .append("(") + .append(distrModeStr).append(")\n"); if (detailLevel == TExplainLevel.BRIEF) { output.append(detailPrefix).append( From bdc683471ea33e6ec20f49ec008fa2d76153cd95 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 18 Mar 2024 13:24:53 +0800 Subject: [PATCH 08/11] fix mv --- .../main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index d7722e090d0240..5dc49ea7f90a53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -223,6 +223,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionIds, ctx.setExecutor(executor); ctx.setQueryId(queryId); command.run(ctx, executor); + ctx.getState().setNereids(true); if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); } From c4154ca52738cb5429a1882507933d2f0e4687db Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 18 Mar 2024 16:25:20 +0800 Subject: [PATCH 09/11] fix --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 5dc49ea7f90a53..499d1571d39c33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -222,8 +222,8 @@ private void exec(ConnectContext ctx, Set refreshPartitionIds, executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setExecutor(executor); ctx.setQueryId(queryId); - command.run(ctx, executor); ctx.getState().setNereids(true); + command.run(ctx, executor); if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); } From 6f0c9bbc3af1bc2c48d545eb9e1e75b748822942 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 18 Mar 2024 20:13:43 +0800 Subject: [PATCH 10/11] tmp log --- be/src/pipeline/exec/join_probe_operator.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 1f5d56359b815a..e0a325e5557991 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -106,7 +106,7 @@ Status JoinProbeLocalState::_build_output_block( // TODO: After FE plan support same nullable of output expr and origin block and mutable column // we should replace `insert_column_datas` by `insert_range_from` - auto insert_column_datas = [keep_origin](auto& to, vectorized::ColumnPtr& from, size_t rows) { + auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { if (to->is_nullable() && !from->is_nullable()) { if (keep_origin || !from->is_exclusive()) { auto& null_column = assert_cast(*to); @@ -122,6 +122,17 @@ Status JoinProbeLocalState::_build_output_block( to = from->assume_mutable(); } } + + if (to->is_nullable()) { + auto& null_column = assert_cast(*to); + bool all_null = true; + for (int i = 0; i < null_column.size(); i++) { + if (!null_column.is_null_at(i)) { + all_null = false; + } + } + LOG_WARNING("yxc test null").tag("null node id", p._node_id).tag("all null", all_null); + } }; LOG_WARNING("yxc test empty copy"); @@ -145,6 +156,7 @@ Status JoinProbeLocalState::_build_output_block( }; LOG_WARNING("yxc test ") + .tag("node id", p._node_id) .tag("Base::_projections", print(Base::_projections)) .tag("_output_expr_ctxs", print(_output_expr_ctxs)) .tag("output_block", mutable_block.dump_names() + " " + mutable_block.dump_types()) @@ -261,6 +273,10 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T // Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN. DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN); } + + if (Base::_node_id == 13) { + LOG_WARNING("yxc test"); + } } template From c2ede1c21e06204571b4a718b29d0de016afd0b6 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 18 Mar 2024 21:26:18 +0800 Subject: [PATCH 11/11] gix --- .../apache/doris/planner/HashJoinNode.java | 19 +++---------------- .../doris/planner/NestedLoopJoinNode.java | 19 +++---------------- 2 files changed, 6 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index b6db0a2fcda707..662e0d153e9c98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -815,22 +815,9 @@ protected void toThrift(TPlanNode msg) { } } if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { - if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null - && outputTupleDesc != null) { - List lhs = vSrcToOutputSMap.getLhs(); - if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { - boolean match = true; - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { - match = false; - break; - } - } - if (match) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - } - } + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); } } if (outputTupleDesc != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 5d3cd437d60f93..3fbfe31adca8fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -195,22 +195,9 @@ protected void toThrift(TPlanNode msg) { msg.nested_loop_join_node.setIsMark(isMarkJoin()); if (ConnectContext.get() != null && !ConnectContext.get().getState().isNereids()) { - if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null - && outputTupleDesc != null) { - List lhs = vSrcToOutputSMap.getLhs(); - if (outputTupleDesc.getSlots().size() == outputTupleDesc.getSlots().size()) { - boolean match = true; - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - if (!outputTupleDesc.getSlots().get(i).getType().equals(lhs.get(i).getType())) { - match = false; - break; - } - } - if (match) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - msg.nested_loop_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - } - } + if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.nested_loop_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); } } if (outputTupleDesc != null) {