Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,7 @@ public PlanFragment visitPhysicalHashJoin(
null, null, null, hashJoin.isMarkJoin());
hashJoinNode.setNereidsId(hashJoin.getId());
hashJoinNode.setChildrenDistributeExprLists(distributeExprLists);
hashJoinNode.setUseSpecificProjections(false);
PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin);

if (JoinUtils.shouldColocateJoin(physicalHashJoin)) {
Expand Down Expand Up @@ -1556,8 +1557,8 @@ public PlanFragment visitPhysicalHashJoin(
TupleDescriptor outputDescriptor = context.generateTupleDesc();
outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

hashJoinNode.setvOutputTupleDesc(outputDescriptor);
hashJoinNode.setvSrcToOutputSMap(srcToOutput);
hashJoinNode.setOutputTupleDesc(outputDescriptor);
hashJoinNode.setProjectList(srcToOutput);
}
if (hashJoin.getStats() != null) {
hashJoinNode.setCardinality((long) hashJoin.getStats().getRowCount());
Expand Down Expand Up @@ -1591,6 +1592,7 @@ public PlanFragment visitPhysicalNestedLoopJoin(
NestedLoopJoinNode nestedLoopJoinNode = new NestedLoopJoinNode(context.nextPlanNodeId(),
leftFragmentPlanRoot, rightFragmentPlanRoot, tupleIds, JoinType.toJoinOperator(joinType),
null, null, null, nestedLoopJoin.isMarkJoin());
nestedLoopJoinNode.setUseSpecificProjections(false);
nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId());
nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists);
if (nestedLoopJoin.getStats() != null) {
Expand Down Expand Up @@ -1737,8 +1739,8 @@ public PlanFragment visitPhysicalNestedLoopJoin(
TupleDescriptor outputDescriptor = context.generateTupleDesc();
outputSlotReferences.forEach(s -> context.createSlotDesc(outputDescriptor, s));

nestedLoopJoinNode.setvOutputTupleDesc(outputDescriptor);
nestedLoopJoinNode.setvSrcToOutputSMap(srcToOutput);
nestedLoopJoinNode.setOutputTupleDesc(outputDescriptor);
nestedLoopJoinNode.setProjectList(srcToOutput);
}
if (nestedLoopJoin.getStats() != null) {
nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
Expand Down Expand Up @@ -1864,8 +1866,8 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
if (inputPlanNode instanceof JoinNodeBase) {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode;
joinNode.setvOutputTupleDesc(tupleDescriptor);
joinNode.setvSrcToOutputSMap(projectionExprs);
joinNode.setOutputTupleDesc(tupleDescriptor);
joinNode.setProjectList(projectionExprs);
// prune the hashOutputSlotIds
if (joinNode instanceof HashJoinNode) {
((HashJoinNode) joinNode).getHashOutputSlotIds().clear();
Expand Down
34 changes: 17 additions & 17 deletions fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator
nullableTupleIds.addAll(outer.getTupleIds());
}
vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple);
vOutputTupleDesc = outputTuple;
this.outputTupleDesc = outputTuple;
vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList());
}

Expand Down Expand Up @@ -220,7 +220,7 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator
nullableTupleIds.addAll(outer.getTupleIds());
}
vIntermediateTupleDescList = Lists.newArrayList(intermediateTuple);
vOutputTupleDesc = outputTuple;
this.outputTupleDesc = outputTuple;
vSrcToOutputSMap = new ExprSubstitutionMap(srcToOutputList, Collections.emptyList());
}

Expand Down Expand Up @@ -800,26 +800,24 @@ protected void toThrift(TPlanNode msg) {
msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt());
}
}
if (vSrcToOutputSMap != null) {
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
// TODO: Enable it after we support new optimizers
// if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
// msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
// } else
msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
if (useSpecificProjections) {
if (vSrcToOutputSMap != null && vSrcToOutputSMap.getLhs() != null && outputTupleDesc != null) {
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
}
}
if (outputTupleDesc != null) {
msg.hash_join_node.setVoutputTupleId(outputTupleDesc.getId().asInt());
}
}
if (vOutputTupleDesc != null) {
msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt());
// TODO Enable it after we support new optimizers
// msg.setOutputTupleId(vOutputTupleDesc.getId().asInt());
}

if (vIntermediateTupleDescList != null) {
for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) {
msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt());
}
}
msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift());
msg.hash_join_node.setUseSpecificProjections(useSpecificProjections);
}

@Override
Expand Down Expand Up @@ -862,9 +860,11 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve
output.append(getRuntimeFilterExplainString(true));
}
output.append(detailPrefix).append(String.format("cardinality=%,d", cardinality)).append("\n");
// todo unify in plan node
if (vOutputTupleDesc != null) {
output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()).append("\n");
if (outputTupleDesc != null) {
output.append(detailPrefix).append("vec output tuple id: ").append(outputTupleDesc.getId()).append("\n");
}
if (outputTupleDesc != null) {
output.append(detailPrefix).append("output tuple id: ").append(outputTupleDesc.getId()).append("\n");
}
if (vIntermediateTupleDescList != null) {
output.append(detailPrefix).append("vIntermediate tuple ids: ");
Expand Down
78 changes: 38 additions & 40 deletions fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -57,10 +56,14 @@ public abstract class JoinNodeBase extends PlanNode {
protected final TableRef innerRef;
protected final JoinOperator joinOp;
protected final boolean isMark;
protected TupleDescriptor vOutputTupleDesc;
protected ExprSubstitutionMap vSrcToOutputSMap;
protected List<TupleDescriptor> vIntermediateTupleDescList;

// in thrift, every planNode denote its output tupelId and projections by output_tuple_id and projections
// but for legacy reasons, JoinNode has its specific representations: voutput_tuple_id and src_expr_list
// if useSpecificProjections true, set the output to its specific attributes.
protected boolean useSpecificProjections = true;

public JoinNodeBase(PlanNodeId id, String planNodeName, StatisticalType statisticalType,
PlanNode outer, PlanNode inner, TableRef innerRef) {
super(id, planNodeName, statisticalType);
Expand Down Expand Up @@ -115,7 +118,7 @@ protected boolean isMaterializedByChild(SlotDescriptor slotDesc, ExprSubstitutio

protected void computeOutputTuple(Analyzer analyzer) throws UserException {
// 1. create new tuple
vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
outputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
boolean copyLeft = false;
boolean copyRight = false;
boolean leftNullable = false;
Expand Down Expand Up @@ -169,7 +172,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException {
getChild(0) instanceof JoinNodeBase && analyzer.isOuterJoined(leftTupleDesc.getId());
for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) {
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc);
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, leftSlotDesc);
if (leftNullable) {
outputSlotDesc.setIsNullable(true);
leftNullableNumber++;
Expand All @@ -189,7 +192,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException {
getChild(1) instanceof JoinNodeBase && analyzer.isOuterJoined(rightTupleDesc.getId());
for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) {
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc);
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, rightSlotDesc);
if (rightNullable) {
outputSlotDesc.setIsNullable(true);
rightNullableNumber++;
Expand All @@ -206,7 +209,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException {
if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) {
SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0);
SlotDescriptor outputSlotDesc =
analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, markSlot);
analyzer.getDescTbl().copySlotDescriptor(outputTupleDesc, markSlot);
srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc));
}

Expand All @@ -224,7 +227,7 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException {
}
}

vOutputTupleDesc.computeStatAndMemLayout();
outputTupleDesc.computeStatAndMemLayout();
// 3. add tupleisnull in null-side
Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size());
// Condition1: the left child is null-side
Expand Down Expand Up @@ -265,8 +268,8 @@ protected void computeOutputTuple(Analyzer analyzer) throws UserException {
public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) {
outputSlotIds = Lists.newArrayList();
List<TupleDescriptor> outputTupleDescList = Lists.newArrayList();
if (vOutputTupleDesc != null) {
outputTupleDescList.add(vOutputTupleDesc);
if (outputTupleDesc != null) {
outputTupleDescList.add(outputTupleDesc);
} else {
for (TupleId tupleId : tupleIds) {
outputTupleDescList.add(analyzer.getTupleDesc(tupleId));
Expand Down Expand Up @@ -296,13 +299,13 @@ public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer)

@Override
public void projectOutputTuple() {
if (vOutputTupleDesc == null) {
if (outputTupleDesc == null) {
return;
}
if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) {
if (outputTupleDesc.getSlots().size() == outputSlotIds.size()) {
return;
}
Iterator<SlotDescriptor> iterator = vOutputTupleDesc.getSlots().iterator();
Iterator<SlotDescriptor> iterator = outputTupleDesc.getSlots().iterator();
while (iterator.hasNext()) {
SlotDescriptor slotDescriptor = iterator.next();
boolean keep = false;
Expand All @@ -318,7 +321,7 @@ public void projectOutputTuple() {
vSrcToOutputSMap.removeByRhsExpr(slotRef);
}
}
vOutputTupleDesc.computeStatAndMemLayout();
outputTupleDesc.computeStatAndMemLayout();
}

protected abstract Pair<Boolean, Boolean> needToCopyRightAndLeft();
Expand Down Expand Up @@ -404,14 +407,14 @@ protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisExcept
// 5. replace tuple is null expr
TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap);

Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == vOutputTupleDesc.getSlots().size());
Preconditions.checkState(vSrcToOutputSMap.getLhs().size() == outputTupleDesc.getSlots().size());
List<Expr> exprs = vSrcToOutputSMap.getLhs();
ArrayList<SlotDescriptor> slots = vOutputTupleDesc.getSlots();
ArrayList<SlotDescriptor> slots = outputTupleDesc.getSlots();
for (int i = 0; i < slots.size(); i++) {
slots.get(i).setIsNullable(exprs.get(i).isNullable());
}
vSrcToOutputSMap.reCalculateNullableInfoForSlotInRhs();
vOutputTupleDesc.computeMemLayout();
outputTupleDesc.computeMemLayout();
}

protected abstract List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer);
Expand Down Expand Up @@ -479,24 +482,24 @@ public void init(Analyzer analyzer) throws UserException {
/**
* If parent wants to get join node tupleids,
* it will call this function instead of read properties directly.
* The reason is that the tuple id of vOutputTupleDesc the real output tuple id for join node.
* The reason is that the tuple id of outputTupleDesc the real output tuple id for join node.
* <p>
* If you read the properties of @tupleids directly instead of this function,
* it reads the input id of the current node.
*/
@Override
public ArrayList<TupleId> getTupleIds() {
Preconditions.checkState(tupleIds != null);
if (vOutputTupleDesc != null) {
return Lists.newArrayList(vOutputTupleDesc.getId());
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
return tupleIds;
}

@Override
public ArrayList<TupleId> getOutputTblRefIds() {
if (vOutputTupleDesc != null) {
return Lists.newArrayList(vOutputTupleDesc.getId());
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
switch (joinOp) {
case LEFT_SEMI_JOIN:
Expand All @@ -513,8 +516,8 @@ public ArrayList<TupleId> getOutputTblRefIds() {

@Override
public List<TupleId> getOutputTupleIds() {
if (vOutputTupleDesc != null) {
return Lists.newArrayList(vOutputTupleDesc.getId());
if (outputTupleDesc != null) {
return Lists.newArrayList(outputTupleDesc.getId());
}
switch (joinOp) {
case LEFT_SEMI_JOIN:
Expand Down Expand Up @@ -544,40 +547,26 @@ public int getNumInstances() {
return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances());
}

/**
* Used by nereids.
*/
public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) {
this.vOutputTupleDesc = vOutputTupleDesc;
}

/**
* Used by nereids.
*/
public void setvIntermediateTupleDescList(List<TupleDescriptor> vIntermediateTupleDescList) {
this.vIntermediateTupleDescList = vIntermediateTupleDescList;
}

/**
* Used by nereids.
*/
public void setvSrcToOutputSMap(List<Expr> lhs) {
this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, Collections.emptyList());
}

public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) {
outputSmap = smap;
ExprSubstitutionMap tmpSmap = new ExprSubstitutionMap(Lists.newArrayList(vSrcToOutputSMap.getRhs()),
Lists.newArrayList(vSrcToOutputSMap.getLhs()));
List<Expr> newRhs = Lists.newArrayList();
boolean bSmapChanged = false;
for (Expr rhsExpr : smap.getRhs()) {
if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(vOutputTupleDesc.getId())) {
if (rhsExpr instanceof SlotRef || !rhsExpr.isBound(outputTupleDesc.getId())) {
newRhs.add(rhsExpr);
} else {
// we need do project in the join node
// add a new slot for projection result and add the project expr to vSrcToOutputSMap
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(vOutputTupleDesc);
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(outputTupleDesc);
slotDesc.initFromExpr(rhsExpr);
slotDesc.setIsMaterialized(true);
// the project expr is from smap, which use the slots of hash join node's output tuple
Expand All @@ -594,7 +583,16 @@ public void setOutputSmap(ExprSubstitutionMap smap, Analyzer analyzer) {

if (bSmapChanged) {
outputSmap.updateRhsExprs(newRhs);
vOutputTupleDesc.computeStatAndMemLayout();
outputTupleDesc.computeStatAndMemLayout();
}
}

public void setUseSpecificProjections(boolean useSpecificProjections) {
this.useSpecificProjections = useSpecificProjections;
}


public boolean isUseSpecificProjections() {
return useSpecificProjections;
}
}
Loading