diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index c99a376b592913..e15384c49e3435 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -875,6 +875,13 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN topN, PlanTra SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context); sortNode.setOffset(topN.getOffset()); sortNode.setLimit(topN.getLimit()); + if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) { + sortNode.setUseTopnOpt(true); + PlanNode child = sortNode.getChild(0); + Preconditions.checkArgument(child instanceof OlapScanNode, + "topN opt expect OlapScanNode, but we get " + child); + ((OlapScanNode) child).setUseTopnOpt(true); + } addPlanRoot(currentFragment, sortNode, topN); } else { // For mergeSort, we need to push sortInfo to exchangeNode diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java index 88441b759b535b..b7f5c5f844784a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java @@ -173,10 +173,8 @@ public SlotDescriptor createSlotDesc(TupleDescriptor tupleDesc, SlotReference sl @Nullable TableIf table) { SlotDescriptor slotDescriptor = this.addSlotDesc(tupleDesc); // Only the SlotDesc that in the tuple generated for scan node would have corresponding column. - if (table != null) { - Optional column = slotReference.getColumn(); - column.ifPresent(slotDescriptor::setColumn); - } + Optional column = slotReference.getColumn(); + column.ifPresent(slotDescriptor::setColumn); slotDescriptor.setType(slotReference.getDataType().toCatalogDataType()); slotDescriptor.setIsMaterialized(true); SlotRef slotRef; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 507d572d7059c2..98b392a66c68d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -68,6 +68,7 @@ public List getProcessors() { } } builder.add(new Validator()); + builder.add(new TopNScanOpt()); return builder.build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java new file mode 100644 index 00000000000000..721ba3773fb00a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.SortPhase; +import org.apache.doris.nereids.trees.plans.algebra.Filter; +import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; +import org.apache.doris.qe.ConnectContext; + +/** + * topN opt + * refer to: + * https://github.com/apache/doris/pull/15558 + * https://github.com/apache/doris/pull/15663 + */ + +public class TopNScanOpt extends PlanPostProcessor { + @Override + public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { + topN.child().accept(this, ctx); + Plan child = topN.child(); + if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { + return topN; + } + long threshold = getTopNOptLimitThreshold(); + if (threshold == -1 || topN.getLimit() > threshold) { + return topN; + } + if (topN.getOrderKeys().isEmpty()) { + return topN; + } + Expression firstKey = topN.getOrderKeys().get(0).getExpr(); + if (!(firstKey instanceof SlotReference)) { + return topN; + } + if (firstKey.getDataType().isStringLikeType() + || firstKey.getDataType().isFloatType() + || firstKey.getDataType().isDoubleType()) { + return topN; + } + while (child != null && (child instanceof Project || child instanceof Filter)) { + child = child.child(0); + } + if (child instanceof PhysicalOlapScan) { + PhysicalOlapScan scan = (PhysicalOlapScan) child; + if (scan.getTable().isDupKeysOrMergeOnWrite()) { + topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); + } + } + return topN; + } + + private long getTopNOptLimitThreshold() { + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; + } + return -1; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java index d7f208d36d2c69..c989b53e24c7a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; @@ -94,6 +95,17 @@ public List buildRules() { sort.child(0)); return topN; }).toRule(RuleType.PUSH_LIMIT_INTO_SORT), + //limit->proj->sort ==> proj->topN + logicalLimit(logicalProject(logicalSort())) + .then(limit -> { + LogicalProject project = limit.child(); + LogicalSort sort = limit.child().child(); + LogicalTopN topN = new LogicalTopN(sort.getOrderKeys(), + limit.getLimit(), + limit.getOffset(), + sort.child(0)); + return project.withChildren(Lists.newArrayList(topN)); + }).toRule(RuleType.PUSH_LIMIT_INTO_SORT), logicalLimit(logicalOneRowRelation()) .then(limit -> limit.getLimit() > 0 && limit.getOffset() == 0 ? limit.child() : new LogicalEmptyRelation(limit.child().getOutput())) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java index 8f07fd20df2673..a6ac3d389b82bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java @@ -54,13 +54,8 @@ public class PhysicalOlapScan extends PhysicalRelation implements OlapScan { public PhysicalOlapScan(ObjectId id, OlapTable olapTable, List qualifier, long selectedIndexId, List selectedTabletIds, List selectedPartitionIds, DistributionSpec distributionSpec, PreAggStatus preAggStatus, Optional groupExpression, LogicalProperties logicalProperties) { - super(id, PlanType.PHYSICAL_OLAP_SCAN, qualifier, groupExpression, logicalProperties); - this.olapTable = olapTable; - this.selectedIndexId = selectedIndexId; - this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds); - this.selectedPartitionIds = ImmutableList.copyOf(selectedPartitionIds); - this.distributionSpec = distributionSpec; - this.preAggStatus = preAggStatus; + this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, distributionSpec, + preAggStatus, groupExpression, logicalProperties, null, null); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 526857355d8329..e22b75078c6947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -39,7 +39,7 @@ * Physical top-N plan. */ public class PhysicalTopN extends AbstractPhysicalSort implements TopN { - + public static String TOPN_RUNTIME_FILTER = "topn_runtime_filter"; private final long limit; private final long offset; @@ -54,10 +54,8 @@ public PhysicalTopN(List orderKeys, long limit, long offset, public PhysicalTopN(List orderKeys, long limit, long offset, SortPhase phase, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { - super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, child); - Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN."); - this.limit = limit; - this.offset = offset; + this(orderKeys, limit, offset, phase, groupExpression, logicalProperties, + null, null, child); } /** diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java new file mode 100644 index 00000000000000..6cbbcbf07110c9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.postprocess; + +import org.apache.doris.nereids.datasets.ssb.SSBTestBase; +import org.apache.doris.nereids.processor.post.PlanPostProcessors; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; +import org.apache.doris.nereids.util.PlanChecker; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TopNRuntimeFilterTest extends SSBTestBase { + @Override + public void runBeforeAll() throws Exception { + super.runBeforeAll(); + } + + @Test + public void testUseTopNRf() { + String sql = "select * from customer order by c_custkey limit 5"; + PlanChecker checker = PlanChecker.from(connectContext).analyze(sql) + .rewrite() + .implement(); + PhysicalPlan plan = checker.getPhysicalPlan(); + new PlanPostProcessors(checker.getCascadesContext()).process(plan); + Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); + PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); + Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + } + + // topn rf do not apply on string-like and float column + @Test + public void testNotUseTopNRf() { + String sql = "select * from customer order by c_name limit 5"; + PlanChecker checker = PlanChecker.from(connectContext).analyze(sql) + .rewrite() + .implement(); + PhysicalPlan plan = checker.getPhysicalPlan(); + new PlanPostProcessors(checker.getCascadesContext()).process(plan); + Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); + PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); + Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + } +}