Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 118 additions & 107 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -123,43 +124,43 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
setParsedPlan(parsedPlan);
PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().reset().start();
Plan resultPlan = null;
try {
resultPlan = plan(parsedPlan, requireProperties, explainLevel);
planWithLock(parsedPlan, requireProperties, explainLevel, plan -> {
setOptimizedPlan(plan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) plan;
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext,
statementContext.getConnectContext().getStatsErrorEstimator());
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);

scanNodeList.addAll(planTranslatorContext.getScanNodes());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
});
} finally {
statementContext.getStopwatch().stop();
}
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) resultPlan;
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext,
statementContext.getConnectContext().getStatsErrorEstimator());
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);

scanNodeList.addAll(planTranslatorContext.getScanNodes());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
}

@VisibleForTesting
Expand All @@ -171,19 +172,22 @@ public void plan(StatementBase queryStmt) {
}
}

public PhysicalPlan plan(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) plan(plan, outputProperties, ExplainLevel.NONE);
public PhysicalPlan planWithLock(LogicalPlan plan, PhysicalProperties outputProperties) {
Consumer<Plan> noCallback = p -> {};
return (PhysicalPlan) planWithLock(plan, outputProperties, ExplainLevel.NONE, noCallback);
}

/**
* Do analyze and optimize for query plan.
*
* @param plan wait for plan
* @param requireProperties request physical properties constraints
* @param lockCallback this callback function will invoke the table lock
* @return plan generated by this planner
* @throws AnalysisException throw exception if failed in ant stage
*/
public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel,
Consumer<Plan> lockCallback) {
if (explainLevel == ExplainLevel.PARSED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
parsedPlan = plan;
if (explainLevel == ExplainLevel.PARSED_PLAN) {
Expand All @@ -197,89 +201,96 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, Explain
initCascadesContext(plan, requireProperties);

try (Lock lock = new Lock(plan, cascadesContext)) {
// resolve column, table and function
Span queryAnalysisSpan =
statementContext.getConnectContext().getTracer()
.spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze();
} catch (Exception e) {
queryAnalysisSpan.recordException(e);
throw e;
} finally {
queryAnalysisSpan.end();
}
Plan resultPlan = planWithoutLock(plan, explainLevel, requireProperties);
lockCallback.accept(resultPlan);
return resultPlan;
}
}

// minidump of input must be serialized first, this process ensure minidump string not null
if (!statementContext.getConnectContext().getSessionVariable().isPlayNereidsDump()
&& statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.init();
String queryId = DebugUtil.printId(statementContext.getConnectContext().queryId());
try {
statementContext.getConnectContext().setMinidump(serializeInputsToDumpFile(plan, queryId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel, PhysicalProperties requireProperties) {
// resolve column, table and function
Span queryAnalysisSpan =
statementContext.getConnectContext().getTracer()
.spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze();
} catch (Exception e) {
queryAnalysisSpan.recordException(e);
throw e;
} finally {
queryAnalysisSpan.end();
}

if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
// minidump of input must be serialized first, this process ensure minidump string not null
if (!statementContext.getConnectContext().getSessionVariable().isPlayNereidsDump()
&& statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.init();
String queryId = DebugUtil.printId(statementContext.getConnectContext().queryId());
try {
statementContext.getConnectContext().setMinidump(serializeInputsToDumpFile(plan, queryId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}

// rule-based optimize
rewrite();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
// rule-based optimize
rewrite();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}

int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}

physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
serializeOutputToDumpFile(physicalPlan, statementContext.getConnectContext());
if (statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.saveMinidumpString(statementContext.getConnectContext().getMinidump(),
DebugUtil.printId(statementContext.getConnectContext().queryId()));
}
NereidsTracer.output(statementContext.getConnectContext());
int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);

return physicalPlan;
physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
serializeOutputToDumpFile(physicalPlan, statementContext.getConnectContext());
if (statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.saveMinidumpString(statementContext.getConnectContext().getMinidump(),
DebugUtil.printId(statementContext.getConnectContext().queryId()));
}
NereidsTracer.output(statementContext.getConnectContext());

return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testGroupByAndHavingUseAliasFirstThrowException() {
}

private void runPlanner(String sql) {
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).plan(
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testFallbackToOriginalPlanner() throws Exception {
sv.setEnableNereidsPlanner(true);
sv.enableFallbackToOriginalPlanner = false;
Assertions.assertThrows(AnalysisException.class, () -> new NereidsPlanner(statementContext)
.plan(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));
.planWithLock(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));

// manually recover sv
sv.setEnableNereidsPlanner(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<Rule> getExplorationRules() {
for (String sql : testSqls) {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
PhysicalPlan plan = new NereidsPlanner(statementContext).plan(
PhysicalPlan plan = new NereidsPlanner(statementContext).planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testTranslateCase() throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public List<Rule> getExplorationRules() {
try {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
PhysicalPlan plan = new NereidsPlanner(statementContext).plan(
PhysicalPlan plan = new NereidsPlanner(statementContext).planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testGeneratePhysicalPlan() {
"SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10",
"SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10"
);
testSql.forEach(sql -> new NereidsPlanner(createStatementCtx(sql)).plan(
testSql.forEach(sql -> new NereidsPlanner(createStatementCtx(sql)).planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testTranslateAllCase() throws Exception {
System.out.println("\n\n***** " + sql + " *****\n\n");
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private PlanFragment getOutputFragment(String sql) throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
((ExplainCommand) parser.parseSingle(sql)).getLogicalPlan(),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private PlanFragment getOutputFragment(String sql) throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Loading