Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,13 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
PlanFragment inputFragment = topN.child(0).accept(this, context);
PlanFragment currentFragment = inputFragment;

//1. generate new fragment for sort when the child is exchangeNode
if (inputFragment.getPlanRoot() instanceof ExchangeNode) {
Preconditions.checkArgument(!topN.getSortPhase().isLocal());
//1. Generate new fragment for sort when the child is exchangeNode, If the child is
// mergingExchange, it means we have always generated a new fragment when processing mergeSort
if (inputFragment.getPlanRoot() instanceof ExchangeNode
&& !((ExchangeNode) inputFragment.getPlanRoot()).isMergingExchange()) {
// Except LocalTopN->MergeTopN, we don't allow localTopN's child is Exchange Node
Preconditions.checkArgument(!topN.getSortPhase().isLocal(),
"local sort requires any property but child is" + inputFragment.getPlanRoot());
DataPartition outputPartition = DataPartition.UNPARTITIONED;
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
inputFragment.setOutputPartition(outputPartition);
Expand All @@ -822,20 +826,28 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra

// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add sortNode
// For localSort or Gather->Sort, we just need to add TopNNode
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
currentFragment.addPlanRoot(sortNode);
} else {
// For mergeSort, we need to push sortInfo to exchangeNode
if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
// if there is no exchange node for mergeSort
// e.g., localSort -> mergeSort
// e.g., mergeTopN -> localTopN
// It means the local has satisfied the Gather property. We can just ignore mergeSort
currentFragment.getPlanRoot().setOffset(topN.getOffset());
currentFragment.getPlanRoot().setLimit(topN.getLimit());
return currentFragment;
}
Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode);
Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode,
"mergeSort' child must be sortNode");
SortNode sortNode = (SortNode) inputFragment.getPlanRoot();
((ExchangeNode) currentFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
ExchangeNode exchangeNode = (ExchangeNode) currentFragment.getPlanRoot();
exchangeNode.setMergeInfo(sortNode.getSortInfo());
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
}
return currentFragment;
}
Expand Down Expand Up @@ -1388,38 +1400,12 @@ public PlanFragment visitPhysicalLimit(PhysicalLimit<? extends Plan> physicalLim
if (inputFragment == null) {
return inputFragment;
}

// For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the LocalLimit has already gathered
// The globalLimit can overwrite the limit and offset, so it's still correct
PlanNode child = inputFragment.getPlanRoot();

// physical plan: limit --> sort
// after translate, it could be:
// 1. limit->sort => set (limit and offset) on sort
// 2. limit->exchange->sort => set (limit and offset) on exchange, set sort.limit = limit+offset
if (child instanceof SortNode) {
SortNode sort = (SortNode) child;
sort.setLimit(physicalLimit.getLimit());
sort.setOffset(physicalLimit.getOffset());
return inputFragment;
}
if (child instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) child;
exchangeNode.setLimit(physicalLimit.getLimit());
// we do not check if this is a merging exchange here,
// since this guaranteed by translating logic plan to physical plan
exchangeNode.setOffset(physicalLimit.getOffset());
if (exchangeNode.getChild(0) instanceof SortNode) {
SortNode sort = (SortNode) exchangeNode.getChild(0);
sort.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
sort.setOffset(0);
}
return inputFragment;
}
// for other PlanNode, just set limit as limit+offset
child.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
PlanFragment planFragment = exchangeToMergeFragment(inputFragment, context);
planFragment.getPlanRoot().setLimit(physicalLimit.getLimit());
planFragment.getPlanRoot().setOffSetDirectly(physicalLimit.getOffset());
return planFragment;
child.setLimit(physicalLimit.getLimit());
child.setOffset(physicalLimit.getOffset());
return inputFragment;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit;

import java.util.List;

Expand Down Expand Up @@ -192,6 +193,7 @@ public class NereidsRewriter extends BatchRewriteJob {
new EliminateAggregate(),
new MergeSetOperations(),
new PushdownLimit(),
new SplitLimit(),
new BuildAggForUnion()
)),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
Expand Down Expand Up @@ -1346,7 +1347,7 @@ private LogicalPlan withLimit(LogicalPlan input, Optional<LimitClauseContext> li
if (offsetToken != null) {
offset = Long.parseLong(offsetToken.getText());
}
return new LogicalLimit<>(limit, offset, input);
return new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, input);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
Expand All @@ -39,10 +40,8 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;

Expand Down Expand Up @@ -105,13 +104,8 @@ public PhysicalProperties visitPhysicalHashAggregate(
}

@Override
public PhysicalProperties visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
return new PhysicalProperties(DistributionSpecGather.INSTANCE, new OrderSpec(topN.getOrderKeys()));
}

@Override
public PhysicalProperties visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) {
public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
if (sort.getSortPhase().isLocal()) {
return new PhysicalProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
Expand Down Expand Up @@ -94,7 +95,7 @@ public Void visit(Plan plan, PlanContext context) {
}

@Override
public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) {
public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, PlanContext context) {
if (!sort.getSortPhase().isLocal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
Expand All @@ -103,6 +104,16 @@ public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanC
return null;
}

@Override
public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) {
if (limit.isGlobal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
addRequestPropertyToChildren(PhysicalProperties.ANY);
}
return null;
}

@Override
public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) {
JoinHint hint = hashJoin.getHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,15 @@ public enum RuleType {
INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),

// split limit
SPLIT_LIMIT(RuleTypeClass.REWRITE),
// limit push down
PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_ONE_ROW_RELATION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_EMPTY_RELATION(RuleTypeClass.REWRITE),

PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE_ON_AGGREGATE(RuleTypeClass.REWRITE),
ADJUST_NULLABLE_ON_ASSERT_NUM_ROWS(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public Rule build() {
return logicalLimit().then(limit -> new PhysicalLimit<>(
limit.getLimit(),
limit.getOffset(),
limit.getPhase(),
limit.getLogicalProperties(),
limit.child())
).toRule(RuleType.LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ public Rule build() {
.toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE);
}

/**
* before: logicalTopN(off, limit)
* after:
* gatherTopN(limit, off, require gather)
* mergeTopN(limit, off, require gather) -> localTopN(off+limit, 0, require any)
*/
private List<PhysicalTopN<? extends Plan>> twoPhaseSort(LogicalTopN logicalTopN) {
PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
logicalTopN.getOffset(), SortPhase.LOCAL_SORT, logicalTopN.getLogicalProperties(), logicalTopN.child(0)
);
PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(),
logicalTopN.getLimit() + logicalTopN.getOffset(), 0, SortPhase.LOCAL_SORT,
logicalTopN.getLogicalProperties(), logicalTopN.child(0));
PhysicalTopN twoPhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
logicalTopN.getOffset(), SortPhase.MERGE_SORT, logicalTopN.getLogicalProperties(), localSort);
PhysicalTopN onePhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
Expand Down Expand Up @@ -117,7 +118,7 @@ private Plan unCorrelatedToJoin(LogicalApply unapply) {
}

private Plan unCorrelatedNotExist(LogicalApply unapply) {
LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right());
LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right());
Alias alias = new Alias(new Count(), "count(*)");
LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(),
ImmutableList.of(alias), newLimit);
Expand All @@ -128,7 +129,7 @@ private Plan unCorrelatedNotExist(LogicalApply unapply) {
}

private Plan unCorrelatedExist(LogicalApply unapply) {
LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right());
LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right());
return new LogicalJoin<>(JoinType.CROSS_JOIN, (LogicalPlan) unapply.left(), newLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@
public class MergeLimits extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalLimit(logicalLimit()).then(upperLimit -> {
LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
return new LogicalLimit<>(
Math.min(upperLimit.getLimit(), Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
bottomLimit.getOffset() + upperLimit.getOffset(),
bottomLimit.child()
);
}).toRule(RuleType.MERGE_LIMITS);
return logicalLimit(logicalLimit())
.when(upperLimit -> upperLimit.getPhase().equals(upperLimit.child().getPhase()))
.then(upperLimit -> {
LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
return new LogicalLimit<>(
Math.min(upperLimit.getLimit(),
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
bottomLimit.getOffset() + upperLimit.getOffset(),
bottomLimit.getPhase(), bottomLimit.child()
);
}).toRule(RuleType.MERGE_LIMITS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -82,6 +84,16 @@ public List<Rule> buildRules() {
return limit.withChildren(union.withChildren(newUnionChildren));
})
.toRule(RuleType.PUSH_LIMIT_THROUGH_UNION),
// limit -> sort ==> topN
logicalLimit(logicalSort())
.then(limit -> {
LogicalSort sort = limit.child();
LogicalTopN topN = new LogicalTopN(sort.getOrderKeys(),
limit.getLimit(),
limit.getOffset(),
sort.child(0));
return topN;
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
logicalLimit(logicalOneRowRelation())
.then(limit -> limit.getLimit() > 0
? limit.child() : new LogicalEmptyRelation(limit.child().getOutput()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public Rule build() {
return logicalProject(logicalLimit(any())).thenApply(ctx -> {
LogicalProject<LogicalLimit<Plan>> logicalProject = ctx.root;
LogicalLimit<Plan> logicalLimit = logicalProject.child();
return new LogicalLimit<>(logicalLimit.getLimit(),
logicalLimit.getOffset(), new LogicalProject<>(logicalProject.getProjects(),
return new LogicalLimit<>(logicalLimit.getLimit(), logicalLimit.getOffset(),
logicalLimit.getPhase(), new LogicalProject<>(logicalProject.getProjects(),
logicalLimit.child()));
}).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.rewrite.logical;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;

/**
* Split limit into two phase
* before:
* Limit(origin) limit, offset
* after:
* Limit(global) limit, offset
* |
* Limit(local) limit + offset, 0
*/
public class SplitLimit extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalLimit().when(limit -> !limit.isSplit())
.then(limit -> {
long l = limit.getLimit();
long o = limit.getOffset();
return new LogicalLimit<>(l, o,
LimitPhase.GLOBAL, new LogicalLimit<>(l + o, 0, LimitPhase.LOCAL, limit.child())
);
}).toRule(RuleType.SPLIT_LIMIT);
}
}

Loading