Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,9 @@ public long fetchRowCount() {
return getRowCountForIndex(baseIndexId, false);
}

/**
* @return -1 if there are some tablets whose row count is not reported to FE
*/
public long getRowCountForIndex(long indexId, boolean strict) {
long rowCount = 0;
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
Expand All @@ -48,6 +49,7 @@
import org.apache.doris.nereids.processor.pre.PlanPreprocessors;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
import org.apache.doris.nereids.stats.StatsCalculator;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
Expand All @@ -56,6 +58,7 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
Expand Down Expand Up @@ -256,6 +259,17 @@ private Plan planWithoutLock(
}
}

// if we cannot get table row count, skip join reorder
// except:
// 1. user set leading hint
// 2. ut test. In ut test, FeConstants.enableInternalSchemaDb is false or FeConstants.runningUnitTest is true
if (FeConstants.enableInternalSchemaDb && !FeConstants.runningUnitTest
&& !cascadesContext.isLeadingDisableJoinReorder()) {
List<LogicalOlapScan> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalOlapScan.class::isInstance);
StatsCalculator.disableJoinReorderIfTableRowCountNotAvailable(scans, cascadesContext);
}

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {

private CascadesContext cascadesContext;

private StatsCalculator(CascadesContext context) {
this.groupExpression = null;
this.cascadesContext = context;
}

private StatsCalculator(GroupExpression groupExpression, boolean forbidUnknownColStats,
Map<String, ColumnStatistic> columnStatisticMap, boolean isPlayNereidsDump,
Map<CTEId, Statistics> cteIdToStats, CascadesContext context) {
Expand All @@ -205,6 +210,27 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
* disable join reorder if any table row count is not available.
*/
public static void disableJoinReorderIfTableRowCountNotAvailable(
List<LogicalOlapScan> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
for (LogicalOlapScan scan : scans) {
double rowCount = calculator.getOlapTableRowCount(scan);
if (rowCount == -1 && ConnectContext.get() != null) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
LOG.info("disable join reorder since row count not available: "
+ scan.getTable().getNameWithFullQualifiers());
} catch (Exception e) {
LOG.info("disableNereidsJoinReorderOnce failed");
}
return;
}
}
}

/**
* estimate stats
*/
Expand All @@ -217,15 +243,6 @@ public static StatsCalculator estimate(GroupExpression groupExpression, boolean
return statsCalculator;
}

public static StatsCalculator estimate(GroupExpression groupExpression, boolean forbidUnknownColStats,
Map<String, ColumnStatistic> columnStatisticMap, boolean isPlayNereidsDump, CascadesContext context) {
return StatsCalculator.estimate(groupExpression,
forbidUnknownColStats,
columnStatisticMap,
isPlayNereidsDump,
new HashMap<>(), context);
}

// For unit test only
public static void estimate(GroupExpression groupExpression, CascadesContext context) {
StatsCalculator statsCalculator = new StatsCalculator(groupExpression, false,
Expand Down Expand Up @@ -364,6 +381,9 @@ private void checkIfUnknownStatsUsedAsKey(StatisticsBuilder builder) {
}
}

/**
* if the table is not analyzed and BE does not report row count, return -1
*/
private double getOlapTableRowCount(OlapScan olapScan) {
OlapTable olapTable = olapScan.getTable();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------filter((main.k1 = 1))
----------PhysicalOlapScan[test]
--PhysicalResultSink
----hashJoin[INNER_JOIN] hashCondition=((m1.k1 = m2.k1)) otherCondition=() build RFs:RF0 k1->[k1]
----hashJoin[INNER_JOIN] hashCondition=((m1.k1 = m2.k1)) otherCondition=()
------filter((temp.k1 = 1))
--------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF0
--------PhysicalCteConsumer ( cteId=CTEId#0 )
------filter((m2.k1 = 1))
--------PhysicalCteConsumer ( cteId=CTEId#0 )

Expand All @@ -21,9 +21,9 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------PhysicalQuickSort[LOCAL_SORT]
----------PhysicalOlapScan[test]
--PhysicalResultSink
----hashJoin[INNER_JOIN] hashCondition=((m1.k1 = m2.k1)) otherCondition=() build RFs:RF0 k1->[k1]
----hashJoin[INNER_JOIN] hashCondition=((m1.k1 = m2.k1)) otherCondition=()
------filter((temp.k1 = 1))
--------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF0
--------PhysicalCteConsumer ( cteId=CTEId#0 )
------filter((m2.k1 = 1))
--------PhysicalCteConsumer ( cteId=CTEId#0 )

Expand Down
94 changes: 47 additions & 47 deletions regression-test/data/nereids_p0/hint/multi_leading.out
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN colocated] hashCondition=((cte.c1 = t1.c1)) otherCondition=()
----------filter((t1.c1 > 300))
------------PhysicalOlapScan[t1]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((cte.c1 = t2.c2)) otherCondition=()
------------filter((cte.c1 > 300))
--------------PhysicalOlapScan[t1]
------------filter((t2.c2 > 300))
--------------PhysicalOlapScan[t2]
----------filter((t1.c1 > 300))
------------PhysicalOlapScan[t1]

Hint log:
Used: leading(t1 t2 ) leading(t1 cte )
UnUsed:
Used:
UnUsed: leading(t1 t2) leading(t1 cte)
SyntaxError:

-- !sql1_4 --
Expand All @@ -43,17 +43,17 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN colocated] hashCondition=((cte.c1 = t1.c1)) otherCondition=()
----------filter((t1.c1 > 300))
------------PhysicalOlapScan[t1]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((cte.c1 = t2.c2)) otherCondition=()
------------filter((cte.c1 > 300))
--------------PhysicalOlapScan[t1]
------------filter((t2.c2 > 300))
--------------PhysicalOlapScan[t2]
----------filter((t1.c1 > 300))
------------PhysicalOlapScan[t1]

Hint log:
Used: leading(t1 t2 ) leading(t1 cte )
UnUsed:
Used:
UnUsed: leading(t1 t2) leading(t1 cte)
SyntaxError:

-- !sql1_res_1 --
Expand All @@ -74,14 +74,14 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(t3 alias1 )
UnUsed:
Used:
UnUsed: leading(t3 alias1)
SyntaxError:

-- !sql2_3 --
Expand All @@ -91,13 +91,13 @@ PhysicalResultSink
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(t2 t1 )
UnUsed:
Used:
UnUsed: leading(t2 t1)
SyntaxError:

-- !sql2_4 --
Expand All @@ -106,14 +106,14 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(t2 t1 ) leading(t3 alias1 )
UnUsed:
Used:
UnUsed: leading(t2 t1) leading(t3 alias1)
SyntaxError:

-- !sql2_res_1 --
Expand All @@ -135,17 +135,17 @@ PhysicalResultSink
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = cte.c11)) otherCondition=()
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
------------PhysicalOlapScan[t3]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.c1 = t2.c2)) otherCondition=()
--------------PhysicalOlapScan[t1]
--------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((cte.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]

Hint log:
Used: leading(t2 t1 ) leading(t3 alias1 cte )
UnUsed:
Used:
UnUsed: leading(t2 t1) leading(t3 alias1 cte)
SyntaxError:

-- !sql3_3 --
Expand All @@ -156,16 +156,16 @@ PhysicalResultSink
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = cte.c11)) otherCondition=()
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
------------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t2.c2)) otherCondition=()
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t1]
--------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((cte.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]

Hint log:
Used: leading(t2 t1 )
UnUsed:
Used:
UnUsed: leading(t2 t1)
SyntaxError:

-- !sql3_4 --
Expand All @@ -175,17 +175,17 @@ PhysicalResultSink
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = cte.c11)) otherCondition=()
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
------------PhysicalOlapScan[t3]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t2.c2)) otherCondition=()
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t1]
--------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((cte.c1 = t2.c2)) otherCondition=()
------------PhysicalOlapScan[t2]
------------PhysicalOlapScan[t1]
------------PhysicalOlapScan[t2]

Hint log:
Used: leading(t2 t1 ) leading(t2 t1 ) leading(t3 alias1 cte )
UnUsed:
Used:
UnUsed: leading(t2 t1) leading(t2 t1) leading(t3 alias1 cte)
SyntaxError:

-- !sql3_res_1 --
Expand All @@ -206,16 +206,16 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.c1 = alias2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((t2.c2 = t4.c4)) otherCondition=()
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t4]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(t3 alias1 )
UnUsed:
Used:
UnUsed: leading(t3 alias1)
SyntaxError:

-- !sql4_2 --
Expand All @@ -225,15 +225,15 @@ PhysicalResultSink
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = alias2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((t2.c2 = t4.c4)) otherCondition=()
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t4]
------------PhysicalOlapScan[t1]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(alias2 t1 )
UnUsed:
Used:
UnUsed: leading(alias2 t1)
SyntaxError:

-- !sql4_3 --
Expand All @@ -245,13 +245,13 @@ PhysicalResultSink
----------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.c1 = alias2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((t2.c2 = t4.c4)) otherCondition=()
--------------PhysicalOlapScan[t4]
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t4]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(t4 t2 )
UnUsed:
Used:
UnUsed: leading(t4 t2)
SyntaxError:

-- !sql4_4 --
Expand All @@ -260,16 +260,16 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = t3.c3)) otherCondition=()
----------PhysicalOlapScan[t3]
----------hashJoin[INNER_JOIN broadcast] hashCondition=((alias1.c1 = alias2.c2)) otherCondition=()
------------PhysicalOlapScan[t1]
------------hashJoin[INNER_JOIN broadcast] hashCondition=((t2.c2 = t4.c4)) otherCondition=()
--------------PhysicalOlapScan[t2]
--------------PhysicalOlapScan[t4]
------------PhysicalOlapScan[t1]
----------PhysicalOlapScan[t3]

Hint log:
Used: leading(alias2 t1 ) leading(t3 alias1 )
UnUsed:
Used:
UnUsed: leading(alias2 t1) leading(t3 alias1)
SyntaxError:

-- !sql4_res_0 --
Expand Down Expand Up @@ -311,12 +311,12 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecGather]
------------hashAgg[LOCAL]
--------------hashJoin[INNER_JOIN shuffle] hashCondition=((t1.c1 = cte.c11)) otherCondition=()
----------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.c1 = cte.c11)) otherCondition=()
----------------PhysicalOlapScan[t1]
----------------PhysicalCteConsumer ( cteId=CTEId#0 )

Hint log:
Used: leading(cte t1 )
UnUsed:
Used:
UnUsed: leading(cte t1)
SyntaxError:

Loading