Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
requiredByProjectSlotIdSet, context);
} else {
if (project.child() instanceof PhysicalDeferMaterializeTopN) {
inputFragment.setOutputExprs(projectionExprs);
inputFragment.setOutputExprs(allProjectionExprs);
} else {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
inputPlanNode.setProjectList(projectionExprs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -69,6 +71,38 @@ public List<Rule> buildRules() {
).then(r -> deferMaterialize(r, r.child(),
Optional.empty(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalProject(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).then(r -> {
LogicalProject<LogicalOlapScan> project = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(project),
Optional.empty(), project.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
Expand Down Expand Up @@ -123,6 +157,65 @@ public List<Rule> buildRules() {
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalProject(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())

)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).when(project -> project.canMergeProjections(project.child().child()))).then(r -> {
LogicalProject<?> upperProject = r.child();
LogicalProject<LogicalOlapScan> bottomProject = r.child().child().child();
List<NamedExpression> projections = upperProject.mergeProjections(bottomProject);
LogicalProject<?> project = upperProject.withProjects(projections);
return deferMaterialize(r, r.child().child(), Optional.of(project),
Optional.empty(), bottomProject.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())

).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
}
return true;
})
)).then(r -> deferMaterialize(r, r.child().child(), Optional.of(r.child()),
Optional.empty(), r.child().child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
Expand Down Expand Up @@ -207,7 +300,13 @@ private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSin
root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) logicalTopN.withChildren(root),
deferredMaterializedExprIds, columnId);
if (logicalProject.isPresent()) {
root = logicalProject.get().withChildren(root);
// generate projections with the order exactly same as result output's
Map<Slot, NamedExpression> projectsMap = Maps.newHashMap();
logicalProject.get().getProjects().forEach(p -> projectsMap.put(p.toSlot(), p));
List<NamedExpression> outputProjects = logicalResultSink.getOutput().stream()
.map(projectsMap::get)
.collect(ImmutableList.toImmutableList());
root = logicalProject.get().withProjectsAndChild(outputProjects, root);
}
root = logicalResultSink.withChildren(root);
return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? extends Plan>) root,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected void runBeforeAll() throws Exception {
createTable("create table test.test_unique_order_by2(a int not null, b int not null, c int, d int) "
+ "unique key(a,b) distributed by hash(a) properties('replication_num'='1');");
connectContext.setDatabase("test");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,DEFER_MATERIALIZE_TOP_N_RESULT");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("lazy_materialize_topn") {
sql """
set enable_two_phase_read_opt = true
"""

sql """
drop table if exists lazy_materialize_topn;
"""

sql """
CREATE TABLE `lazy_materialize_topn` (
`c1` int NULL,
`c2` int NULL,
`c3` int NULL,
`c4` array<int> NULL
)
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"light_schema_change" = "true"
);
"""

sql """
insert into lazy_materialize_topn values (1, 1, 1, [1]), (2, 2, 2, [2]), (3, 3, 3, [3]);
"""

sql """
sync
"""

List sqls = [
// TopN(Scan)
"""select * from lazy_materialize_topn order by c1 limit 10""",
// TopN(Project(Scan))
"""select c1, c2 from lazy_materialize_topn order by c1 limit 10""",
// Project(TopN(Scan))
"""select c1, c2, c3, c4 from lazy_materialize_topn order by c1 limit 10""",
// Project(TopN(Project(Scan)))
"""select c1 + 1, c2 + 1 from (select c1, c2 from lazy_materialize_topn order by c1 limit 10) t""",
// TopN(Filter(Scan))
"""select * from lazy_materialize_topn where c2 < 5 order by c1 limit 10;""",
// TopN(Project(Filter(Scan)))
"""select c1, c2, c3 from lazy_materialize_topn where c2 < 5 order by c1 limit 10;""",
// Project(TopN(Project(Filter(Scan))))
"""select c1 + 1, c2 + 1, c3 + 1 from ( select c1, c2, c3 from lazy_materialize_topn where c2 < 5 order by c1 limit 10) t""",
// project set is diff with output list
"""select c1, c1, c2 from (select c1, c2 from lazy_materialize_topn where c3 < 1 order by c2 limit 1)t;"""
]

for (sqlStr in sqls) {
explain {
sql """${sqlStr}"""
contains """OPT TWO PHASE"""
}
sql """${sqlStr}"""
}
}