Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1398,13 +1398,20 @@ public PlanFragment visitPhysicalLimit(PhysicalLimit<? extends Plan> physicalLim

// Union contains oneRowRelation
if (inputFragment == null) {
return inputFragment;
return null;
}
// For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the LocalLimit has already gathered
// The globalLimit can overwrite the limit and offset, so it's still correct

PlanNode child = inputFragment.getPlanRoot();
child.setLimit(physicalLimit.getLimit());

// This case means GlobalLimit's child isn't gatherNode, which suggests the child is UNPARTITIONED
// When there is valid offset, exchangeNode should be added because other node don't support offset
if (physicalLimit.isGlobal() && physicalLimit.hasValidOffset()
&& !(child instanceof ExchangeNode)) {
inputFragment = createParentFragment(inputFragment, DataPartition.UNPARTITIONED, context);
child = inputFragment.getPlanRoot();
}
child.setOffset(physicalLimit.getOffset());
child.setLimit(physicalLimit.getLimit());
return inputFragment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public List<Rule> buildRules() {
return topN;
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
logicalLimit(logicalOneRowRelation())
.then(limit -> limit.getLimit() > 0
.then(limit -> limit.getLimit() > 0 && limit.getOffset() == 0
? limit.child() : new LogicalEmptyRelation(limit.child().getOutput()))
.toRule(RuleType.PUSH_LIMIT_THROUGH_ONE_ROW_RELATION),
logicalLimit(logicalEmptyRelation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ public class ExchangeNode extends PlanNode {
// exchange node. Null if this exchange does not merge sorted streams
private SortInfo mergeInfo;

// Offset after which the exchange begins returning rows. Currently valid
// only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
private long offset;

/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
Expand Down Expand Up @@ -145,25 +141,6 @@ public void setMergeInfo(SortInfo info) {
: MERGING_EXCHANGE_NODE;
}

/**
* This function is used to translate PhysicalLimit.
* Ignore the offset if this is not a merging exchange node.
* @param offset
*/
public void setOffset(long offset) {
if (isMergingExchange()) {
this.offset = offset;
}
}

/**
* Used by new optimizer only.
*/
@Override
public void setOffSetDirectly(long offset) {
this.offset = offset;
}

@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
Expand Down
39 changes: 39 additions & 0 deletions regression-test/suites/nereids_syntax_p0/test_limit.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_limit") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'


sql """
drop table if exists test1
"""
sql """
CREATE TABLE IF NOT EXISTS test1(
id int
)
DISTRIBUTED BY HASH(id) properties("replication_num" = "1");
"""

sql """ insert into test1 values(1) """
sql """ insert into test1 values(1) """
test {
sql "select * from test1 limit 2 offset 1"
result([[1]])
}
}