Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 133 additions & 27 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public class StatementContext implements Closeable {
* indicate where the table come from.
* QUERY: in query sql directly
* INSERT_TARGET: the insert target table
* MTMV: mtmv itself and its related tables witch do not belong to this sql, but maybe used in rewrite by mtmv.
* MTMV: mtmv itself and its related tables witch do not belong to this sql, but
* maybe used in rewrite by mtmv.
*/
public enum TableFrom {
QUERY,
Expand All @@ -122,7 +123,8 @@ public enum TableFrom {
private final Map<String, Supplier<Object>> contextCacheMap = Maps.newLinkedHashMap();

private OriginStatement originStatement;
// NOTICE: we set the plan parsed by DorisParser to parsedStatement and if the plan is command, create a
// NOTICE: we set the plan parsed by DorisParser to parsedStatement and if the
// plan is command, create a
// LogicalPlanAdapter with the logical plan in the command.
private StatementBase parsedStatement;
private ColumnAliasGenerator columnAliasGenerator;
Expand All @@ -134,10 +136,14 @@ public enum TableFrom {

private boolean hasNondeterministic = false;

// hasUnknownColStats true if any column stats in the tables used by this sql is unknown
// the algorithm to derive plan when column stats are unknown is implemented in cascading framework, not in dphyper.
// And hence, when column stats are unknown, even if the tables used by a sql is more than
// MAX_TABLE_COUNT_USE_CASCADES_JOIN_REORDER, join reorder should choose cascading framework.
// hasUnknownColStats true if any column stats in the tables used by this sql is
// unknown
// the algorithm to derive plan when column stats are unknown is implemented in
// cascading framework, not in dphyper.
// And hence, when column stats are unknown, even if the tables used by a sql is
// more than
// MAX_TABLE_COUNT_USE_CASCADES_JOIN_REORDER, join reorder should choose
// cascading framework.
// Thus hasUnknownColStats has higher priority than isDpHyp
private boolean hasUnknownColStats = false;

Expand All @@ -160,11 +166,13 @@ public enum TableFrom {
private final Set<String> viewDdlSqlSet = Sets.newHashSet();
private final SqlCacheContext sqlCacheContext;

// generate for next id for prepared statement's placeholders, which is connection level
// generate for next id for prepared statement's placeholders, which is
// connection level
private final IdGenerator<PlaceholderId> placeHolderIdGenerator = PlaceholderId.createGenerator();
// relation id to placeholders for prepared statement, ordered by placeholder id
private final Map<PlaceholderId, Expression> idToPlaceholderRealExpr = new TreeMap<>();
// map placeholder id to comparison slot, which will used to replace conjuncts directly
// map placeholder id to comparison slot, which will used to replace conjuncts
// directly
private final Map<PlaceholderId, SlotReference> idToComparisonSlot = new TreeMap<>();

// collect all hash join conditions to compute node connectivity in join graph
Expand All @@ -173,7 +181,8 @@ public enum TableFrom {
private final List<Hint> hints = new ArrayList<>();
private boolean hintForcePreAggOn = false;

// the columns in Plan.getExpressions(), such as columns in join condition or filter condition, group by expression
// the columns in Plan.getExpressions(), such as columns in join condition or
// filter condition, group by expression
private final Set<SlotReference> keySlots = Sets.newHashSet();
private BitSet disableRules;

Expand Down Expand Up @@ -217,19 +226,24 @@ public enum TableFrom {
private final List<Column> insertTargetSchema = new ArrayList<>();

// for create view support in nereids
// key is the start and end position of the sql substring that needs to be replaced,
// key is the start and end position of the sql substring that needs to be
// replaced,
// and value is the new string used for replacement.
private final TreeMap<Pair<Integer, Integer>, String> indexInSqlToString
= new TreeMap<>(new Pair.PairComparator<>());
// Record table id mapping, the key is the hash code of union catalogId, databaseId, tableId
private final TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new TreeMap<>(
new Pair.PairComparator<>());
// Record table id mapping, the key is the hash code of union catalogId,
// databaseId, tableId
// the value is the auto-increment id in the cascades context
private final Map<List<String>, TableId> tableIdMapping = new LinkedHashMap<>();
// Record the materialization statistics by id which is used for cost estimation.
// Maybe return null, which means the id according statistics should calc normally rather than getting
// Record the materialization statistics by id which is used for cost
// estimation.
// Maybe return null, which means the id according statistics should calc
// normally rather than getting
// form this map
private final Map<RelationId, Statistics> relationIdToStatisticsMap = new LinkedHashMap<>();

// Indicates the query is short-circuited in both plan and execution phase, typically
// Indicates the query is short-circuited in both plan and execution phase,
// typically
// for high speed/concurrency point queries
private boolean isShortCircuitQuery;

Expand All @@ -251,8 +265,8 @@ public enum TableFrom {
private long materializedViewRewriteDuration = 0L;

// Record used table and it's used partitions
private final Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap =
HashMultimap.create();
private final Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap = HashMultimap
.create();
private final Map<Integer, Integer> relationIdToCommonTableIdMap = new HashMap<>();

// Record mtmv and valid partitions map because this is time-consuming behavior
Expand All @@ -270,8 +284,11 @@ public enum TableFrom {
// this record the rewritten plan by mv in RBO phase
private final List<Plan> rewrittenPlansByMv = new ArrayList<>();
private boolean forceRecordTmpPlan = false;
// this record the rule in PreMaterializedViewRewriter.NEED_PRE_REWRITE_RULE_TYPES if is applied successfully
// or not, if success and in PreRewriteStrategy.FOR_IN_ROB or PreRewriteStrategy.TRY_IN_ROB, mv
// this record the rule in
// PreMaterializedViewRewriter.NEED_PRE_REWRITE_RULE_TYPES if is applied
// successfully
// or not, if success and in PreRewriteStrategy.FOR_IN_ROB or
// PreRewriteStrategy.TRY_IN_ROB, mv
// would be written in RBO phase
private final BitSet needPreMvRewriteRuleMasks = new BitSet(RuleType.SENTINEL.ordinal());
// if needed to rewrite in RBO phase, this would be set true
Expand All @@ -286,7 +303,8 @@ public enum TableFrom {

private Optional<Map<TableIf, Set<Expression>>> mvRefreshPredicates = Optional.empty();

// For Iceberg rewrite operations: store file scan tasks to be used by IcebergScanNode
// For Iceberg rewrite operations: store file scan tasks to be used by
// IcebergScanNode
// TODO: better solution?
private List<org.apache.iceberg.FileScanTask> icebergRewriteFileScanTasks = null;
private boolean hasNestedColumns;
Expand All @@ -312,7 +330,8 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta
exprIdGenerator = ExprId.createGenerator(initialId);
if (connectContext != null && connectContext.getSessionVariable() != null) {
if (CacheAnalyzer.canUseSqlCache(connectContext.getSessionVariable())) {
// cannot set the queryId here because the queryId for the current query is set in the subsequent steps.
// cannot set the queryId here because the queryId for the current query is set
// in the subsequent steps.
this.sqlCacheContext = new SqlCacheContext(
connectContext.getCurrentUserIdentity());
if (originStatement != null) {
Expand Down Expand Up @@ -342,7 +361,7 @@ public boolean isHintForcePreAggOn() {
* cache view info to avoid view's def and sql mode changed before lock it.
*
* @param qualifiedViewName full qualified name of the view
* @param view view need to cache info
* @param view view need to cache info
*
* @return view info, first is view's def sql, second is view's sql mode
*/
Expand Down Expand Up @@ -560,8 +579,8 @@ public synchronized void invalidCache(String cacheKey) {

public ColumnAliasGenerator getColumnAliasGenerator() {
return columnAliasGenerator == null
? columnAliasGenerator = new ColumnAliasGenerator()
: columnAliasGenerator;
? columnAliasGenerator = new ColumnAliasGenerator()
: columnAliasGenerator;
}

public String generateColumnName() {
Expand Down Expand Up @@ -608,6 +627,91 @@ public Map<CTEId, LogicalPlan> getRewrittenCteConsumer() {
return rewrittenCteConsumer;
}

/**
* Snapshot current CTE-related environment for temporary rewrite/optimization.
*/
public CteEnvironmentSnapshot cacheCteEnvironment() {
return new CteEnvironmentSnapshot(
copyMapOfSets(cteIdToConsumers),
copyMapOfSets(cteIdToOutputIds),
new HashMap<>(cteIdToProducerStats),
copyMapOfSets(consumerIdToFilters),
copyMapOfLists(cteIdToConsumerGroup),
new HashMap<>(rewrittenCteProducer),
new HashMap<>(rewrittenCteConsumer));
}

/** Restore CTE-related environment from snapshot. */
public void restoreCteEnvironment(CteEnvironmentSnapshot snapshot) {
cteIdToConsumers.clear();
cteIdToConsumers.putAll(snapshot.cteIdToConsumers);

cteIdToOutputIds.clear();
cteIdToOutputIds.putAll(snapshot.cteIdToOutputIds);

cteIdToProducerStats.clear();
cteIdToProducerStats.putAll(snapshot.cteIdToProducerStats);

consumerIdToFilters.clear();
consumerIdToFilters.putAll(snapshot.consumerIdToFilters);

cteIdToConsumerGroup.clear();
cteIdToConsumerGroup.putAll(snapshot.cteIdToConsumerGroup);

rewrittenCteProducer.clear();
rewrittenCteProducer.putAll(snapshot.rewrittenCteProducer);

rewrittenCteConsumer.clear();
rewrittenCteConsumer.putAll(snapshot.rewrittenCteConsumer);
}

private static <K, V> Map<K, Set<V>> copyMapOfSets(Map<K, Set<V>> source) {
Map<K, Set<V>> copied = new HashMap<>();
for (Map.Entry<K, Set<V>> entry : source.entrySet()) {
copied.put(entry.getKey(), new HashSet<>(entry.getValue()));
}
return copied;
}

private static <K, V> Map<K, List<V>> copyMapOfLists(Map<K, List<V>> source) {
Map<K, List<V>> copied = new HashMap<>();
for (Map.Entry<K, List<V>> entry : source.entrySet()) {
copied.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}
return copied;
}

/** Holder for cached CTE-related environment. */
public static class CteEnvironmentSnapshot {
private final Map<CTEId, Set<LogicalCTEConsumer>> cteIdToConsumers;
private final Map<CTEId, Set<Slot>> cteIdToOutputIds;
private final Map<CTEId, Statistics> cteIdToProducerStats;
private final Map<RelationId, Set<Expression>> consumerIdToFilters;
private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup;
private final Map<CTEId, LogicalPlan> rewrittenCteProducer;
private final Map<CTEId, LogicalPlan> rewrittenCteConsumer;

/**
* cte related structures in StatementContext
*/
public CteEnvironmentSnapshot(
Map<CTEId, Set<LogicalCTEConsumer>> cteIdToConsumers,
Map<CTEId, Set<Slot>> cteIdToOutputIds,
Map<CTEId, Statistics> cteIdToProducerStats,
Map<RelationId, Set<Expression>> consumerIdToFilters,
Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup,
Map<CTEId, LogicalPlan> rewrittenCteProducer,
Map<CTEId, LogicalPlan> rewrittenCteConsumer) {
this.cteIdToConsumers = cteIdToConsumers;
this.cteIdToOutputIds = cteIdToOutputIds;
this.cteIdToProducerStats = cteIdToProducerStats;
this.consumerIdToFilters = consumerIdToFilters;
this.cteIdToConsumerGroup = cteIdToConsumerGroup;
this.rewrittenCteProducer = rewrittenCteProducer;
this.rewrittenCteConsumer = rewrittenCteConsumer;
}
}

public void addViewDdlSql(String ddlSql) {
this.viewDdlSqlSet.add(ddlSql);
}
Expand Down Expand Up @@ -656,6 +760,7 @@ public void addStatistics(Id id, Statistics statistics) {

/**
* get used mv hint by hint name
*
* @param useMvName hint name, can either be USE_MV or NO_USE_MV
* @return optional of useMvHint
*/
Expand Down Expand Up @@ -813,7 +918,7 @@ public Optional<MvccSnapshot> getSnapshot(TableIf tableIf) {
* Obtain snapshot information of mvcc
*
* @param mvccTableInfo mvccTableInfo
* @param snapshot snapshot
* @param snapshot snapshot
*/
public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) {
snapshots.put(mvccTableInfo, snapshot);
Expand Down Expand Up @@ -1026,7 +1131,8 @@ public void setMvRefreshPredicates(

/**
* Set file scan tasks for Iceberg rewrite operations.
* This allows IcebergScanNode to use specific file scan tasks instead of scanning the full table.
* This allows IcebergScanNode to use specific file scan tasks instead of
* scanning the full table.
*/
public void setIcebergRewriteFileScanTasks(List<org.apache.iceberg.FileScanTask> tasks) {
this.icebergRewriteFileScanTasks = tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.cost.Cost;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseCboRuleHint;
Expand All @@ -28,6 +29,7 @@
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -58,7 +60,8 @@ public CostBasedRewriteJob(List<RewriteJob> rewriteJobs) {

@Override
public void execute(JobContext jobContext) {
// checkHint.first means whether it use hint and checkHint.second means what kind of hint it used
// checkHint.first means whether it use hint and checkHint.second means what
// kind of hint it used
Pair<Boolean, Hint> checkHint = checkRuleHint();
// this means it no_use_cbo_rule(xxx) hint
if (checkHint.first && checkHint.second == null) {
Expand All @@ -69,14 +72,18 @@ public void execute(JobContext jobContext) {
CascadesContext applyCboRuleCtx = CascadesContext.newCurrentTreeContext(currentCtx);
// execute cbo rule on one candidate
Rewriter.getCteChildrenRewriter(applyCboRuleCtx, rewriteJobs).execute();
Plan applyCboPlan = applyCboRuleCtx.getRewritePlan();
if (skipCboRuleCtx.getRewritePlan().deepEquals(applyCboRuleCtx.getRewritePlan())) {
// this means rewrite do not do anything
return;
}

StatementContext.CteEnvironmentSnapshot cteEnvSnapshot = currentCtx.getStatementContext().cacheCteEnvironment();
// compare two candidates
Optional<Pair<Cost, GroupExpression>> skipCboRuleCost = getCost(currentCtx, skipCboRuleCtx, jobContext);
currentCtx.getStatementContext().restoreCteEnvironment(cteEnvSnapshot);
Optional<Pair<Cost, GroupExpression>> appliedCboRuleCost = getCost(currentCtx, applyCboRuleCtx, jobContext);
currentCtx.getStatementContext().restoreCteEnvironment(cteEnvSnapshot);
// If one of them optimize failed, just return
if (!skipCboRuleCost.isPresent() || !appliedCboRuleCost.isPresent()) {
LOG.warn("Cbo rewrite execute failed on sql: {}, jobs are {}, plan is {}.",
Expand All @@ -92,19 +99,20 @@ public void execute(JobContext jobContext) {
}
return;
}
// If the candidate applied cbo rule is better, replace the original plan with it.
// If the candidate applied cbo rule is better, replace the original plan with
// it.
if (appliedCboRuleCost.get().first.getValue() < skipCboRuleCost.get().first.getValue()) {
currentCtx.addPlanProcesses(applyCboRuleCtx.getPlanProcesses());
currentCtx.setRewritePlan(applyCboRuleCtx.getRewritePlan());
currentCtx.setRewritePlan(applyCboPlan);
}
}

/**
* check if we have use rule hint or no use rule hint
* return an optional object which checkHint.first means whether it use hint
* and checkHint.second means what kind of hint it used
* example, when we use *+ no_use_cbo_rule(xxx) * the optional would be (true, false)
* which means it use hint and the hint forbid this kind of rule
* return an optional object which checkHint.first means whether it use hint
* and checkHint.second means what kind of hint it used
* example, when we use *+ no_use_cbo_rule(xxx) * the optional would be (true,
* false)
* which means it use hint and the hint forbid this kind of rule
*/
private Pair<Boolean, Hint> checkRuleHint() {
Pair<Boolean, Hint> checkResult = Pair.of(false, null);
Expand Down Expand Up @@ -134,7 +142,8 @@ private Pair<Boolean, Hint> checkRuleHint() {
}

/**
* for these rules we need use_cbo_rule hint to enable it, otherwise it would be close by default
* for these rules we need use_cbo_rule hint to enable it, otherwise it would be
* close by default
*/
private static boolean checkBlackList(RuleType ruleType) {
List<RuleType> ruleWhiteList = new ArrayList<>(Arrays.asList(
Expand Down
Loading
Loading