Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
}
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Plan originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,6 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -118,48 +119,57 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
setParsedPlan(parsedPlan);
PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().reset().start();
Plan resultPlan = null;
try {
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
planWithLock(parsedPlan, requireProperties, explainLevel, showPlanProcess, plan -> {
setOptimizedPlan(plan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) plan;
translate(physicalPlan);
});
} finally {
statementContext.getStopwatch().stop();
}
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
}

@VisibleForTesting
public void plan(StatementBase queryStmt) {
public void planWithLock(StatementBase queryStmt) {
try {
plan(queryStmt, statementContext.getConnectContext().getSessionVariable().toThrift());
} catch (Exception e) {
throw new NereidsException(e.getMessage(), e);
}
}

public PhysicalPlan plan(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) plan(plan, outputProperties, ExplainLevel.NONE, false);
public PhysicalPlan planWithLock(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) planWithLock(plan, outputProperties, ExplainLevel.NONE, false);
}

public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
return plan(plan, requireProperties, explainLevel, false);
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
return planWithLock(plan, requireProperties, explainLevel, false);
}

public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess) {
Consumer<Plan> noCallback = p -> {};
return planWithLock(plan, requireProperties, explainLevel, showPlanProcess, noCallback);
}

/**
* Do analyze and optimize for query plan.
*
* @param plan wait for plan
* @param requireProperties request physical properties constraints
* @param showPlanProcess is record plan process to CascadesContext
* @param lockCallback this callback function will invoke the table lock
* @return plan generated by this planner
* @throws AnalysisException throw exception if failed in ant stage
*/
public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess) {
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess,
Consumer<Plan> lockCallback) {
try {
if (plan instanceof LogicalSqlCache) {
rewrittenPlan = analyzedPlan = plan;
Expand All @@ -186,75 +196,83 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties,
initCascadesContext(plan, requireProperties);

try (Lock lock = new Lock(plan, cascadesContext)) {
// resolve column, table and function
// analyze this query
analyze(showAnalyzeProcess(explainLevel, showPlanProcess));
// minidump of input must be serialized first, this process ensure minidump string not null
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}
Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties);
lockCallback.accept(resultPlan);
return resultPlan;
}
} finally {
statementContext.releasePlannerResources();
}
}

if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime();
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}
private Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
// analyze this query
analyze(showAnalyzeProcess(explainLevel, showPlanProcess));
// minidump of input must be serialized first, this process ensure minidump string not null
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}

if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime();
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}

// rule-based optimize
rewrite(showRewriteProcess(explainLevel, showPlanProcess));
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}

if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}
// rule-based optimize
rewrite(showRewriteProcess(explainLevel, showPlanProcess));
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}

// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}

int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}

physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
MinidumpUtils.serializeOutputToDumpFile(physicalPlan);
NereidsTracer.output(statementContext.getConnectContext());
int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);

return physicalPlan;
}
} finally {
statementContext.releasePlannerResources();
physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
MinidumpUtils.serializeOutputToDumpFile(physicalPlan);
NereidsTracer.output(statementContext.getConnectContext());

return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public static JSONObject executeSql(String sql) {
}
NereidsPlanner nereidsPlanner = new NereidsPlanner(
new StatementContext(ConnectContext.get(), new OriginStatement(sql, 0)));
nereidsPlanner.plan(LogicalPlanAdapter.of(parsed));
nereidsPlanner.planWithLock(LogicalPlanAdapter.of(parsed));
return ((AbstractPlan) nereidsPlanner.getOptimizedPlan()).toJson();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {

private Pair<ImmutableList<String>, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan analyzedPlan = planner.plan(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Plan analyzedPlan = planner.planWithLock(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Set<LogicalCatalogRelation> logicalCatalogRelationSet = analyzedPlan
.collect(LogicalCatalogRelation.class::isInstance);
if (logicalCatalogRelationSet.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
Plan plan = planner.plan(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
if (ctasCols == null) {
// we should analyze the plan firstly to get the columns' name.
ctasCols = plan.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {

private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan analyzedPlan = planner.plan(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Plan analyzedPlan = planner.planWithLock(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Set<LogicalCatalogRelation> logicalCatalogRelationSet = analyzedPlan
.collect(LogicalCatalogRelation.class::isInstance);
if (logicalCatalogRelationSet.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void init(ConnectContext ctx) throws UserException {
/**validate*/
public void validate(ConnectContext ctx) throws UserException {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
planner.planWithLock(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
Set<String> colSets = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (Column col : finalCols) {
if (!colSets.add(col.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
Plan plan = planner.plan(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
if (plan.anyMatch(node -> node instanceof OneRowRelation)) {
throw new AnalysisException("at least contain one table");
}
Expand Down Expand Up @@ -280,7 +280,7 @@ private void getRelation(NereidsPlanner planner) {
}
Plan plan;
try {
plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void validate(ConnectContext ctx) throws UserException {
PrivPredicate.CREATE.getPrivs().toString(), viewName.getTbl());
}
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
planner.planWithLock(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
Set<String> colSets = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (Column col : finalCols) {
if (!colSets.add(col.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectCont
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ private PhysicalPlan getPhysicalPlan(String insertSql, PhysicalProperties physic
}
Assertions.assertTrue(exPlan instanceof UnboundLogicalSink);
NereidsPlanner planner = new NereidsPlanner(statementContext);
return planner.plan((UnboundLogicalSink<?>) exPlan, physicalProperties);
return planner.planWithLock((UnboundLogicalSink<?>) exPlan, physicalProperties);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testGroupByAndHavingUseAliasFirstThrowException() {
}

private void runPlanner(String sql) {
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).plan(
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testFallbackToOriginalPlanner() throws Exception {
sv.setEnableNereidsPlanner(true);
sv.enableFallbackToOriginalPlanner = false;
Assertions.assertThrows(AnalysisException.class, () -> new NereidsPlanner(statementContext)
.plan(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));
.planWithLock(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));

// manually recover sv
sv.setEnableNereidsPlanner(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<Rule> getExplorationRules() {
for (String sql : testSqls) {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
PhysicalPlan plan = new NereidsPlanner(statementContext).plan(
PhysicalPlan plan = new NereidsPlanner(statementContext).planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Loading