Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Placeholder;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.PlaceholderId;
Expand Down Expand Up @@ -63,7 +62,6 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -130,15 +128,6 @@ public class StatementContext implements Closeable {
private final List<Expression> joinFilters = new ArrayList<>();

private final List<Hint> hints = new ArrayList<>();
// Root Slot -> Paths -> Sub-column Slots
private final Map<Slot, Map<List<String>, SlotReference>> subColumnSlotRefMap
= Maps.newHashMap();

// Map from rewritten slot to original expr
private final Map<Slot, Expression> subColumnOriginalExprMap = Maps.newHashMap();

// Map from original expr to rewritten slot
private final Map<Expression, Slot> originalExprToRewrittenSubColumn = Maps.newHashMap();

// Map slot to its relation, currently used in SlotReference to find its original
// Relation for example LogicalOlapScan
Expand Down Expand Up @@ -262,58 +251,10 @@ public Optional<SqlCacheContext> getSqlCacheContext() {
return Optional.ofNullable(sqlCacheContext);
}

public Set<SlotReference> getAllPathsSlots() {
Set<SlotReference> allSlotReferences = Sets.newHashSet();
for (Map<List<String>, SlotReference> slotReferenceMap : subColumnSlotRefMap.values()) {
allSlotReferences.addAll(slotReferenceMap.values());
}
return allSlotReferences;
}

public Expression getOriginalExpr(SlotReference rewriteSlot) {
return subColumnOriginalExprMap.getOrDefault(rewriteSlot, null);
}

public Slot getRewrittenSlotRefByOriginalExpr(Expression originalExpr) {
return originalExprToRewrittenSubColumn.getOrDefault(originalExpr, null);
}

/**
* Add a slot ref attached with paths in context to avoid duplicated slot
*/
public void addPathSlotRef(Slot root, List<String> paths, SlotReference slotRef, Expression originalExpr) {
subColumnSlotRefMap.computeIfAbsent(root, k -> Maps.newTreeMap((lst1, lst2) -> {
Iterator<String> it1 = lst1.iterator();
Iterator<String> it2 = lst2.iterator();
while (it1.hasNext() && it2.hasNext()) {
int result = it1.next().compareTo(it2.next());
if (result != 0) {
return result;
}
}
return Integer.compare(lst1.size(), lst2.size());
}));
subColumnSlotRefMap.get(root).put(paths, slotRef);
subColumnOriginalExprMap.put(slotRef, originalExpr);
originalExprToRewrittenSubColumn.put(originalExpr, slotRef);
}

public SlotReference getPathSlot(Slot root, List<String> paths) {
Map<List<String>, SlotReference> pathsSlotsMap = subColumnSlotRefMap.getOrDefault(root, null);
if (pathsSlotsMap == null) {
return null;
}
return pathsSlotsMap.getOrDefault(paths, null);
}

public void addSlotToRelation(Slot slot, Relation relation) {
slotToRelation.put(slot, relation);
}

public Relation getRelationBySlot(Slot slot) {
return slotToRelation.getOrDefault(slot, null);
}

public boolean isDpHyp() {
return isDpHyp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HighOrderFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
import org.apache.doris.nereids.trees.expressions.functions.scalar.PushDownToProjectionFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdaf;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdf;
Expand All @@ -101,7 +100,6 @@
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFunctionBinaryType;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -210,20 +208,6 @@ private OlapTable getOlapTableDirectly(SlotRef left) {

@Override
public Expr visitElementAt(ElementAt elementAt, PlanTranslatorContext context) {
if (PushDownToProjectionFunction.validToPushDown(elementAt)) {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null
&& !ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) {
throw new AnalysisException(
"set enable_rewrite_element_at_to_slot=true when using element_at function for variant type");
}
SlotReference rewrittenSlot = (SlotReference) context.getConnectContext()
.getStatementContext().getRewrittenSlotRefByOriginalExpr(elementAt);
// rewrittenSlot == null means variant is not from table. so keep element_at function
if (rewrittenSlot != null) {
return context.findSlotRef(rewrittenSlot.getExprId());
}
}
return visitScalarFunction(elementAt, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import org.apache.doris.nereids.trees.expressions.functions.scalar.PushDownToProjectionFunction;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
Expand Down Expand Up @@ -1244,8 +1243,7 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, P
}
if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode
// this means we have filter->limit->project, need a SelectNode
|| (child instanceof PhysicalProject
&& !((PhysicalProject<?>) child).hasPushedDownToProjectionFunctions())) {
|| child instanceof PhysicalProject) {
// the three nodes don't support conjuncts, need create a SelectNode to filter data
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
selectNode.setNereidsId(filter.getId());
Expand Down Expand Up @@ -1827,35 +1825,6 @@ && findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
return inputFragment;
}

// collect all valid PushDownToProjectionFunction from expression
private List<Expression> getPushDownToProjectionFunctionForRewritten(NamedExpression expression) {
List<Expression> targetExprList = expression.collectToList(PushDownToProjectionFunction.class::isInstance);
return targetExprList.stream()
.filter(PushDownToProjectionFunction::validToPushDown)
.collect(Collectors.toList());
}

// register rewritten slots from original PushDownToProjectionFunction
private void registerRewrittenSlot(PhysicalProject<? extends Plan> project, OlapScanNode olapScanNode) {
// register slots that are rewritten from element_at/etc..
List<Expression> allPushDownProjectionFunctions = project.getProjects().stream()
.map(this::getPushDownToProjectionFunctionForRewritten)
.flatMap(List::stream)
.collect(Collectors.toList());
for (Expression expr : allPushDownProjectionFunctions) {
PushDownToProjectionFunction function = (PushDownToProjectionFunction) expr;
if (context != null
&& context.getConnectContext() != null
&& context.getConnectContext().getStatementContext() != null) {
Slot argumentSlot = function.getInputSlots().stream().findFirst().get();
Expression rewrittenSlot = PushDownToProjectionFunction.rewriteToSlot(
function, (SlotReference) argumentSlot);
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot);
}
}
}

// TODO: generate expression mapping when be project could do in ExecNode.
@Override
public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanTranslatorContext context) {
Expand All @@ -1870,12 +1839,6 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project

PlanFragment inputFragment = project.child(0).accept(this, context);

if (inputFragment.getPlanRoot() instanceof OlapScanNode) {
// function already pushed down in projection
// e.g. select count(distinct cast(element_at(v, 'a') as int)) from tbl;
registerRewrittenSlot(project, (OlapScanNode) inputFragment.getPlanRoot());
}

PlanNode inputPlanNode = inputFragment.getPlanRoot();
List<Expr> projectionExprs = null;
List<Expr> allProjectionExprs = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ public SlotDescriptor createSlotDesc(TupleDescriptor tupleDesc, SlotReference sl
slotDescriptor.setLabel(slotReference.getName());
} else {
slotRef = new SlotRef(slotDescriptor);
if (slotReference.hasSubColPath()) {
slotDescriptor.setSubColLables(slotReference.getSubColPath());
if (slotReference.hasSubColPath() && slotReference.getColumn().isPresent()) {
slotDescriptor.setSubColLables(slotReference.getSubPath());
// use lower case name for variant's root, since backend treat parent column as lower case
// see issue: https://github.com/apache/doris/pull/32999/commits
slotDescriptor.setMaterializedColumnName(slotRef.getColumnName().toLowerCase()
+ "." + String.join(".", slotReference.getSubColPath()));
+ "." + String.join(".", slotReference.getSubPath()));
}
}
slotRef.setTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ public AbstractBatchJobExecutor(CascadesContext cascadesContext) {
this.cascadesContext = Objects.requireNonNull(cascadesContext, "cascadesContext can not null");
}

/**
* flat map jobs in TopicRewriteJob to could really run jobs, and filter null.
*/
public static List<RewriteJob> jobs(RewriteJob... jobs) {
return Arrays.stream(jobs)
.filter(Objects::nonNull)
.flatMap(job -> job instanceof TopicRewriteJob
? ((TopicRewriteJob) job).jobs.stream()
? ((TopicRewriteJob) job).jobs.stream().filter(Objects::nonNull)
: Stream.of(job)
).collect(ImmutableList.toImmutableList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
import org.apache.doris.nereids.rules.analysis.BindSink;
import org.apache.doris.nereids.rules.analysis.BindSlotWithPaths;
import org.apache.doris.nereids.rules.analysis.BuildAggForRandomDistributedTable;
import org.apache.doris.nereids.rules.analysis.CheckAfterBind;
import org.apache.doris.nereids.rules.analysis.CheckAnalysis;
Expand Down Expand Up @@ -136,7 +135,6 @@ private static List<RewriteJob> buildAnalyzeJobs(Optional<CustomTableResolver> c
new CheckPolicy()
),
bottomUp(new BindExpression()),
bottomUp(new BindSlotWithPaths()),
topDown(new BindSink()),
bottomUp(new CheckAfterBind()),
bottomUp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.nereids.rules.rewrite.CheckMatchExpression;
import org.apache.doris.nereids.rules.rewrite.CheckMultiDistinct;
import org.apache.doris.nereids.rules.rewrite.CheckPrivileges;
import org.apache.doris.nereids.rules.rewrite.ClearContextStatus;
import org.apache.doris.nereids.rules.rewrite.CollectCteConsumerOutput;
import org.apache.doris.nereids.rules.rewrite.CollectFilterAboveConsumer;
import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
Expand Down Expand Up @@ -132,12 +133,16 @@
import org.apache.doris.nereids.rules.rewrite.TransposeSemiJoinAggProject;
import org.apache.doris.nereids.rules.rewrite.TransposeSemiJoinLogicalJoin;
import org.apache.doris.nereids.rules.rewrite.TransposeSemiJoinLogicalJoinProject;
import org.apache.doris.nereids.rules.rewrite.VariantSubPathPruning;
import org.apache.doris.nereids.rules.rewrite.batch.ApplyToJoin;
import org.apache.doris.nereids.rules.rewrite.batch.CorrelateApplyToUnCorrelateApply;
import org.apache.doris.nereids.rules.rewrite.batch.EliminateUselessPlanUnderApply;
import org.apache.doris.nereids.rules.rewrite.mv.SelectMaterializedIndexWithAggregate;
import org.apache.doris.nereids.rules.rewrite.mv.SelectMaterializedIndexWithoutAggregate;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -146,7 +151,7 @@
*/
public class Rewriter extends AbstractBatchJobExecutor {

private static final List<RewriteJob> CTE_CHILDREN_REWRITE_JOBS = jobs(
private static final List<RewriteJob> CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN = jobs(
topic("Plan Normalization",
topDown(
new EliminateOrderByConstant(),
Expand Down Expand Up @@ -396,9 +401,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
topic("adjust preagg status",
topDown(new AdjustPreAggStatus())
),
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
topic("Point query short circuit",
topDown(new LogicalResultSinkToShortCircuitPointQuery())),
topic("eliminate",
Expand All @@ -411,6 +413,25 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(new SumLiteralRewrite(),
new MergePercentileToArray())
),
topic("Push project and filter on cte consumer to cte producer",
topDown(
new CollectFilterAboveConsumer(),
new CollectCteConsumerOutput()
)
)
);

private static final List<RewriteJob> CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN = jobs(
// after variant sub path pruning, we need do column pruning again
custom(RuleType.COLUMN_PRUNING, ColumnPruning::new),
bottomUp(ImmutableList.of(
new PushDownFilterThroughProject(),
new MergeProjects()
)),
custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, EliminateUnnecessaryProject::new),
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
// this rule batch must keep at the end of rewrite to do some plan check
topic("Final rewrite and check",
custom(RuleType.CHECK_DATA_TYPES, CheckDataTypes::new),
Expand All @@ -423,12 +444,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
new CheckAfterRewrite()
)
),
topic("Push project and filter on cte consumer to cte producer",
topDown(
new CollectFilterAboveConsumer(),
new CollectCteConsumerOutput()
)
)
topDown(new CollectCteConsumerOutput())
);

private static final List<RewriteJob> WHOLE_TREE_REWRITE_JOBS
Expand Down Expand Up @@ -456,35 +472,61 @@ public static Rewriter getCteChildrenRewriter(CascadesContext cascadesContext, L
return new Rewriter(cascadesContext, jobs);
}

/**
* only
*/
public static Rewriter getWholeTreeRewriterWithCustomJobs(CascadesContext cascadesContext, List<RewriteJob> jobs) {
return new Rewriter(cascadesContext, getWholeTreeRewriteJobs(jobs));
return new Rewriter(cascadesContext, getWholeTreeRewriteJobs(false, false, jobs, ImmutableList.of()));
}

private static List<RewriteJob> getWholeTreeRewriteJobs(boolean withCostBased) {
List<RewriteJob> withoutCostBased = Rewriter.CTE_CHILDREN_REWRITE_JOBS.stream()
List<RewriteJob> withoutCostBased = Rewriter.CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN.stream()
.filter(j -> !(j instanceof CostBasedRewriteJob))
.collect(Collectors.toList());
return getWholeTreeRewriteJobs(withCostBased ? CTE_CHILDREN_REWRITE_JOBS : withoutCostBased);
return getWholeTreeRewriteJobs(true, true,
withCostBased ? CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN : withoutCostBased,
CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN);
}

private static List<RewriteJob> getWholeTreeRewriteJobs(List<RewriteJob> jobs) {
return jobs(
private static List<RewriteJob> getWholeTreeRewriteJobs(
boolean needSubPathPushDown,
boolean needOrExpansion,
List<RewriteJob> beforePushDownJobs,
List<RewriteJob> afterPushDownJobs) {

List<RewriteJob> rewriteJobs = Lists.newArrayListWithExpectedSize(300);
rewriteJobs.addAll(jobs(
topic("cte inline and pull up all cte anchor",
custom(RuleType.PULL_UP_CTE_ANCHOR, PullUpCteAnchor::new),
custom(RuleType.CTE_INLINE, CTEInline::new)
),
topic("process limit session variables",
custom(RuleType.ADD_DEFAULT_LIMIT, AddDefaultLimit::new)
),
topic("rewrite cte sub-tree",
custom(RuleType.REWRITE_CTE_CHILDREN, () -> new RewriteCteChildren(jobs))
topic("rewrite cte sub-tree before sub path push down",
custom(RuleType.REWRITE_CTE_CHILDREN, () -> new RewriteCteChildren(beforePushDownJobs))
)));
if (needOrExpansion) {
rewriteJobs.addAll(jobs(topic("or expansion",
custom(RuleType.OR_EXPANSION, () -> OrExpansion.INSTANCE))));
}
if (needSubPathPushDown) {
rewriteJobs.addAll(jobs(
topic("variant element_at push down",
custom(RuleType.VARIANT_SUB_PATH_PRUNING, VariantSubPathPruning::new)
)
));
}
rewriteJobs.addAll(jobs(
topic("rewrite cte sub-tree after sub path push down",
custom(RuleType.CLEAR_CONTEXT_STATUS, ClearContextStatus::new),
custom(RuleType.REWRITE_CTE_CHILDREN, () -> new RewriteCteChildren(afterPushDownJobs))
),
topic("or expansion",
custom(RuleType.OR_EXPANSION, () -> OrExpansion.INSTANCE)),
topic("whole plan check",
custom(RuleType.ADJUST_NULLABLE, AdjustNullable::new)
)
);
));
return rewriteJobs;
}

@Override
Expand Down
Loading