Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext;
import org.apache.doris.nereids.stats.StatsCalculator;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;

/**
* Due to the limitation on the data size in the memo, when optimizing large SQL queries, once this
* limitation is triggered, some subtrees of the plan tree may not undergo optimization. Therefore,
Expand Down Expand Up @@ -62,25 +67,29 @@ private Plan swapJoinChildrenIfNeed(LogicalJoin<? extends Plan, ? extends Plan>
// if we swap left semi/anti to right semi/anti, we lost the opportunity to optimize join order
return null;
}
JoinType swapType = join.getJoinType().swap();
if (swapType == null) {
return null;
}
AbstractPlan left = (AbstractPlan) join.left();
AbstractPlan right = (AbstractPlan) join.right();
if (left.getStats() == null) {
left.accept(derive, new DeriveContext());
}
if (right.getStats() == null) {
right.accept(derive, new DeriveContext());
}
List<CatalogRelation> scans = join.collectToList(CatalogRelation.class::isInstance);
Optional<String> disableReason = StatsCalculator.disableJoinReorderIfStatsInvalid(scans, null);
if (!disableReason.isPresent()) {
JoinType swapType = join.getJoinType().swap();
if (swapType == null) {
return null;
}
AbstractPlan left = (AbstractPlan) join.left();
AbstractPlan right = (AbstractPlan) join.right();
if (left.getStats() == null) {
left.accept(derive, new DeriveContext());
}
if (right.getStats() == null) {
right.accept(derive, new DeriveContext());
}

// requires "left.getStats().getRowCount() > 0" to avoid dead loop when negative row count is estimated.
if (left.getStats().getRowCount() < right.getStats().getRowCount() * SWAP_THRESHOLD
&& left.getStats().getRowCount() > 0) {
join = join.withTypeChildren(swapType, right, left,
join.getJoinReorderContext());
return join;
// requires "left.getStats().getRowCount() > 0" to avoid dead loop when negative row count is estimated.
if (left.getStats().getRowCount() < right.getStats().getRowCount() * SWAP_THRESHOLD
&& left.getStats().getRowCount() > 0) {
join = join.withTypeChildren(swapType, right, left,
join.getJoinReorderContext());
return join;
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRela
LOG.debug("disable join reorder since row count not available: "
+ scan.getTable().getNameWithFullQualifiers());
}
try {
context.getConnectContext().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true");
} catch (Exception e) {
LOG.error("disable NereidsJoinReorderOnce failed", e);
}
return Optional.of("table[" + scan.getTable().getName() + "] row count is invalid");
}
if (scan instanceof OlapScan) {
Expand All @@ -284,7 +290,7 @@ public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRela
LOG.debug("disable join reorder since col stats invalid: " + reason.get());
}
} catch (Exception e) {
LOG.error("disable NereidsJoinReorderOnce failed");
LOG.error("disable NereidsJoinReorderOnce failed", e);
}
return reason;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !shape --
PhysicalResultSink
--hashJoin[INNER_JOIN colocated] hashCondition=((t1.k = t2.k)) otherCondition=()
----PhysicalOlapScan[t2]
----PhysicalOlapScan[t1]

-- !shape --
PhysicalResultSink
--hashJoin[INNER_JOIN colocated] hashCondition=((t1.k = t2.k)) otherCondition=()
----PhysicalOlapScan[t1]
----PhysicalOlapScan[t2]

-- !shape --
PhysicalResultSink
--hashJoin[INNER_JOIN colocated] hashCondition=((t1.k = t2.k)) otherCondition=()
----PhysicalOlapScan[t2]
----PhysicalOlapScan[t1]

Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ PhysicalResultSink
----hashAgg[DISTINCT_LOCAL]
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
----------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.a = t2.a)) otherCondition=()
----------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.a = t2.a)) otherCondition=()
------------PhysicalOlapScan[test_skew_hint]
------------PhysicalOlapScan[test_skew_hint]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ PhysicalResultSink
-- !join_1_shape --
PhysicalResultSink
--NestedLoopJoin[CROSS_JOIN]
----PhysicalProject[t2.x, t2.y, t2.z]
------filter(((cast(x as BIGINT) + cast(y as BIGINT)) = 1))
--------PhysicalOlapScan[t2]
----PhysicalProject[t1.a, t1.b, t1.c]
------filter((t1.a = 1) and (t1.b = 1))
--------PhysicalOlapScan[t1]
----PhysicalProject[t2.x, t2.y, t2.z]
------filter(((cast(x as BIGINT) + cast(y as BIGINT)) = 1))
--------PhysicalOlapScan[t2]

-- !join_1_result --

Expand Down Expand Up @@ -190,12 +190,12 @@ PhysicalResultSink
PhysicalResultSink
--hashJoin[INNER_JOIN] hashCondition=((t2.x = t3.a)) otherCondition=() build RFs:RF0 a->[x]
----NestedLoopJoin[CROSS_JOIN]
------PhysicalProject[t2.x]
--------filter(((cast(x as BIGINT) * 10) = 10))
----------PhysicalOlapScan[t2] apply RFs: RF0
------PhysicalProject[t1.a]
--------filter((t1.a = 10))
----------PhysicalOlapScan[t1]
------PhysicalProject[t2.x]
--------filter(((cast(x as BIGINT) * 10) = 10))
----------PhysicalOlapScan[t2] apply RFs: RF0
----PhysicalProject[t3.a]
------filter(((cast(a as BIGINT) * 10) = 10) and (t3.__DORIS_DELETE_SIGN__ = 0))
--------PhysicalOlapScan[t3]
Expand Down Expand Up @@ -263,29 +263,31 @@ PhysicalResultSink
-- !subquery_7_shape --
PhysicalResultSink
--NestedLoopJoin[CROSS_JOIN]
----PhysicalProject[t2.x]
------filter(((cast(x as BIGINT) * 10) = 10))
--------PhysicalOlapScan[t2]
----PhysicalProject[t.a]
------filter((t1.a = 10))
--------PhysicalOlapScan[t1]
----PhysicalProject[t2.x]
------filter(((cast(x as BIGINT) * 10) = 10))
--------PhysicalOlapScan[t2]

-- !subquery_7_result --
10 1

-- !subquery_8_shape --
PhysicalResultSink
--NestedLoopJoin[CROSS_JOIN]
----PhysicalUnion
------PhysicalProject[(cast(a as BIGINT) * 10) AS `k`, t1.b]
--------filter(((cast(a as BIGINT) * 10) = 10))
----------PhysicalOlapScan[t1]
------PhysicalProject[cast(x as BIGINT) AS `k`, y AS `b`]
--------filter((t2.x = 10))
----------PhysicalOlapScan[t2]
----PhysicalProject[t3.a]
------filter((t3.__DORIS_DELETE_SIGN__ = 0) and (t3.a = 2))
--------PhysicalOlapScan[t3]
--PhysicalQuickSort[MERGE_SORT, orderKeys=(k asc null first, b asc null first)]
----PhysicalQuickSort[LOCAL_SORT, orderKeys=(k asc null first, b asc null first)]
------NestedLoopJoin[CROSS_JOIN]
--------PhysicalUnion
----------PhysicalProject[(cast(a as BIGINT) * 10) AS `k`, t1.b]
------------filter(((cast(a as BIGINT) * 10) = 10))
--------------PhysicalOlapScan[t1]
----------PhysicalProject[cast(x as BIGINT) AS `k`, y AS `b`]
------------filter((t2.x = 10))
--------------PhysicalOlapScan[t2]
--------PhysicalProject[t3.a]
----------filter((t3.__DORIS_DELETE_SIGN__ = 0) and (t3.a = 2))
------------PhysicalOlapScan[t3]

-- !subquery_8_result --
10 2 2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("invalid_stats_join_order") {
sql """
drop table if exists t1;

CREATE TABLE IF NOT EXISTS t1 (
k int(11) NULL COMMENT "",
v varchar(50) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 7
PROPERTIES ("replication_num" = "1", "colocate_with" = "group_1");

insert into t1 values (1, 'a');

alter table t1 modify column k set stats ('row_count'='100', 'ndv'='1', 'num_nulls'='0', 'min_value'='0');

drop table if exists t2;

CREATE TABLE IF NOT EXISTS t2 (
k int(11) NULL COMMENT "",
v varchar(50) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 7
PROPERTIES ("replication_num" = "1", "colocate_with" = "group_1");

insert into t2 values (1, 'a');
alter table t2 modify column k set stats ('row_count'='10000000', 'ndv'='10', 'num_nulls'='0', 'min_value'='0');

set runtime_filter_mode=off;
"""

qt_shape """
explain shape plan
select * from t1 join t2 on t1.k = t2.k;
"""

sql """
alter table t1 modify column k set stats ('row_count'='-1', 'ndv'='1', 'num_nulls'='0', 'min_value'='0');
"""

// disable join reorder because rowCount = -1
qt_shape """
explain shape plan
select * from t1 join t2 on t1.k = t2.k;
"""

sql """
alter table t1 modify column k set stats ('row_count'='100', 'ndv'='11', 'num_nulls'='0', 'min_value'='0');
"""

// ndv > rowCount * 10
qt_shape """
explain shape plan
select * from t1 join t2 on t1.k = t2.k;
"""

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

suite("test_agg_skew_hint") {
sql "set runtime_filter_mode=OFF"
sql "set disable_join_reorder=true;"
sql "drop table if exists test_skew_hint"
sql "create table test_skew_hint (a int, b int, c int) distributed by hash(a) properties('replication_num'='1');"
sql "insert into test_skew_hint(a,b,c) values(1,2,3),(1,2,4),(1,3,4),(2,3,5),(2,4,5),(3,4,5),(3,5,6),(3,6,7),(3,7,8),(3,8,9),(3,10,11);"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ suite('constant_propagation') {
SET detail_shape_nodes='PhysicalProject,PhysicalHashAggregate,PhysicalQuickSort';
SET ignore_shape_nodes='PhysicalDistribute';
SET runtime_filter_type=2;
set disable_join_reorder=true;
"""

sql 'drop table if exists t1'
Expand Down Expand Up @@ -245,7 +246,7 @@ suite('constant_propagation') {

explain_and_result 'subquery_8', '''
select t.k, t.b, t3.a
from (select a * 10 as k, b from t1 union all select x as k, y as b from t2) t join t3 on t.k = t3.a * 5 where t3.a = 2
from (select a * 10 as k, b from t1 union all select x as k, y as b from t2) t join t3 on t.k = t3.a * 5 where t3.a = 2 order by k, b, a
'''

explain_and_result 'subquery_9', '''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ suite("mv_ssb_test") {
sql "set runtime_filter_mode=OFF"
sql "SET enable_nereids_timeout = false"
sql "SET BATCH_SIZE = 4064"

sql """
alter table customer modify column c_mktsegment SET STATS ('ndv'='5', 'num_nulls'='0', 'row_count'='3000');
alter table lineorder modify column lo_revenue SET STATS ('ndv'='453898', 'num_nulls'='0', 'row_count'='30600572');
alter table part modify column p_mfgr SET STATS ('ndv'='5', 'num_nulls'='0', 'row_count'='20000.0');
alter table date modify column d_dayofweek SET STATS ('ndv'='7', 'num_nulls'='0', 'row_count'='255');
alter table supplier modify column s_phone SET STATS ('ndv'='200', 'num_nulls'='0', 'row_count'='200');
"""
def mv1_1 = """
SELECT SUM(lo_extendedprice*lo_discount) AS
REVENUE
Expand Down
Loading