Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
Expand Down Expand Up @@ -144,6 +144,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeTVFScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
Expand Down Expand Up @@ -1071,7 +1072,7 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT

@Override
public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, PlanTranslatorContext context) {
List<Slot> slots = tvfRelation.getLogicalProperties().getOutput();
List<Slot> slots = tvfRelation.getOutput();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, tvfRelation.getFunction().getTable(), context);

TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction();
Expand Down Expand Up @@ -2663,7 +2664,7 @@ public PlanFragment visitPhysicalLazyMaterialize(PhysicalLazyMaterialize<? exten
materializeNode.setIdxs(materialize.getlazyTableIdxs());

List<Boolean> rowStoreFlags = new ArrayList<>();
for (CatalogRelation relation : materialize.getRelations()) {
for (Relation relation : materialize.getRelations()) {
rowStoreFlags.add(shouldUseRowStore(relation));
}
materializeNode.setRowStoreFlags(rowStoreFlags);
Expand All @@ -2677,7 +2678,7 @@ public PlanFragment visitPhysicalLazyMaterialize(PhysicalLazyMaterialize<? exten
return inputPlanFragment;
}

private boolean shouldUseRowStore(CatalogRelation rel) {
private boolean shouldUseRowStore(Relation rel) {
boolean useRowStore = false;
if (rel instanceof PhysicalOlapScan) {
OlapTable olapTable = ((PhysicalOlapScan) rel).getTable();
Expand All @@ -2687,6 +2688,39 @@ private boolean shouldUseRowStore(CatalogRelation rel) {
return useRowStore;
}

@Override
public PlanFragment visitPhysicalLazyMaterializeTVFScan(PhysicalLazyMaterializeTVFScan tvfRelation,
PlanTranslatorContext context) {
List<Slot> slots = tvfRelation.getOutput();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, tvfRelation.getFunction().getTable(), context);

TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction();
SessionVariable sv = ConnectContext.get().getSessionVariable();
ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
scanNode.setNereidsId(tvfRelation.getId());
context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), scanNode.getId());
Utils.execWithUncheckedException(scanNode::init);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(tvfRelation)
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
context.addScanNode(scanNode, tvfRelation);

// TODO: it is weird update label in this way
// set label for explain
for (Slot slot : slots) {
String tableColumnName = TableValuedFunctionIf.TVF_TABLE_PREFIX + tvfRelation.getFunction().getName()
+ "." + slot.getName();
context.findSlotRef(slot.getExprId()).setLabel(tableColumnName);
}

PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, tvfRelation);
context.addPlanFragment(planFragment);
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), tvfRelation);
return planFragment;
}

@Override
public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterializeOlapScan lazyScan,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -94,31 +97,51 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
return topN;
}

Map<CatalogRelation, List<Slot>> relationToLazySlotMap = new HashMap<>();
Map<Relation, List<Slot>> relationToLazySlotMap = new HashMap<>();
for (Slot slot : lazyMaterializeSlots) {
MaterializeSource source = materializeMap.get(slot);
relationToLazySlotMap.computeIfAbsent(source.relation, relation -> new ArrayList<>()).add(slot);
}

Plan result = topN;
List<Slot> originOutput = topN.getOutput();
BiMap<CatalogRelation, SlotReference> relationToRowId = HashBiMap.create(relationToLazySlotMap.size());
BiMap<Relation, SlotReference> relationToRowId = HashBiMap.create(relationToLazySlotMap.size());
HashSet<SlotReference> rowIdSet = new HashSet<>();
// we should use threadStatementContext, not ctx.getStatementContext(), because the StatisticsCleaner
// will generate two statementContext, and reuse the plan which generated by outer StatementContext,
// so we should generate exprId by the outer StatementContext, or else generate conflict expr id
StatementContext threadStatementContext = StatementScopeIdGenerator.getStatementContext();
for (CatalogRelation relation : relationToLazySlotMap.keySet()) {
Column rowIdCol = new Column(Column.GLOBAL_ROWID_COL + relation.getTable().getName(),
Type.STRING, false, null, false,
"", relation.getTable().getName() + ".global_row_id");
SlotReference rowIdSlot = SlotReference.fromColumn(
threadStatementContext.getNextExprId(), relation.getTable(), rowIdCol, relation.getQualifier());
result = result.accept(new LazySlotPruning(),
new LazySlotPruning.Context((PhysicalCatalogRelation) relation,
rowIdSlot, relationToLazySlotMap.get(relation)));
relationToRowId.put(relation, rowIdSlot);
rowIdSet.add(rowIdSlot);
for (Relation relation : relationToLazySlotMap.keySet()) {
if (relation instanceof CatalogRelation) {
CatalogRelation catalogRelation = (CatalogRelation) relation;
Column rowIdCol = new Column(Column.GLOBAL_ROWID_COL + catalogRelation.getTable().getName(),
Type.STRING, false, null, false,
"", catalogRelation.getTable().getName() + ".global_row_id");
SlotReference rowIdSlot = SlotReference.fromColumn(threadStatementContext.getNextExprId(),
catalogRelation.getTable(), rowIdCol, catalogRelation.getQualifier());
result = result.accept(new LazySlotPruning(),
new LazySlotPruning.Context((PhysicalCatalogRelation) relation,
rowIdSlot, relationToLazySlotMap.get(relation)));
relationToRowId.put(catalogRelation, rowIdSlot);
rowIdSet.add(rowIdSlot);
} else if (relation instanceof PhysicalTVFRelation) {
PhysicalTVFRelation tvfRelation = (PhysicalTVFRelation) relation;

Column rowIdCol = new Column(Column.GLOBAL_ROWID_COL + tvfRelation.getFunction().getName(),
Type.STRING, false, null, false,
"", tvfRelation.getFunction().getName() + ".global_row_id");

SlotReference rowIdSlot = SlotReference.fromColumn(threadStatementContext.getNextExprId(),
tvfRelation.getFunction().getTable(), rowIdCol, ImmutableList.of());
result = result.accept(new LazySlotPruning(),
new LazySlotPruning.Context((PhysicalTVFRelation) relation,
rowIdSlot, relationToLazySlotMap.get(relation)));
relationToRowId.put(tvfRelation, rowIdSlot);
rowIdSet.add(rowIdSlot);
} else {
// should not reach here.
throw new RuntimeException("LazyMaterializeTopN not support this relation." + relation);
}
}

// materialize.child.output requires
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeTVFScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;

import com.google.common.collect.ImmutableList;
Expand All @@ -53,17 +55,17 @@ public class LazySlotPruning extends DefaultPlanRewriter<LazySlotPruning.Context
* Context
*/
public static class Context {
private PhysicalCatalogRelation scan;
private PhysicalRelation scan;
private List<Slot> lazySlots;
private SlotReference rowIdSlot;

public Context(PhysicalCatalogRelation scan, SlotReference rowIdSlot, List<Slot> lazySlots) {
public Context(PhysicalRelation scan, SlotReference rowIdSlot, List<Slot> lazySlots) {
this.scan = scan;
this.lazySlots = lazySlots;
this.rowIdSlot = rowIdSlot;
}

private Context(PhysicalCatalogRelation scan, List<Slot> lazySlots, SlotReference rowIdSlot) {
private Context(PhysicalRelation scan, List<Slot> lazySlots, SlotReference rowIdSlot) {
this.scan = scan;
this.lazySlots = lazySlots;
this.rowIdSlot = rowIdSlot;
Expand Down Expand Up @@ -130,6 +132,18 @@ public Plan visitPhysicalFileScan(PhysicalFileScan scan, Context context) {
}
}

@Override
public Plan visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Context context) {
if (tvfRelation.getOutput().containsAll(context.lazySlots)) {
PhysicalLazyMaterializeTVFScan lazyScan = new PhysicalLazyMaterializeTVFScan(tvfRelation,
context.rowIdSlot, context.lazySlots);
return lazyScan;
} else {
// should not hit here
throw new RuntimeException("Lazy materialize fault");
}
}

@Override
public Plan visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterializeOlapScan scan, Context context) {
// should not come here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;

import com.google.common.collect.ImmutableSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -101,6 +103,20 @@ boolean checkRelationTableSupportedType(PhysicalCatalogRelation relation) {
return true;
}

boolean checkTVFRelationTableSupportedType(PhysicalTVFRelation tvfRelation) {
Map<String, String> properties = tvfRelation.getFunction().getTVFProperties().getMap();
String functionName = tvfRelation.getFunction().getName();

if (functionName.equals("local") || functionName.equals("s3") || functionName.equals("hdfs")) {
if (properties.containsKey("format")
&& (properties.get("format").equalsIgnoreCase("parquet")
|| properties.get("format").equalsIgnoreCase("orc"))) {
return true;
}
}
return false;
}

@Override
public Optional<MaterializeSource> visitPhysicalOlapScan(PhysicalOlapScan scan, ProbeContext context) {
if (scan.getSelectedIndexId() == scan.getTable().getBaseIndexId()) {
Expand All @@ -125,6 +141,22 @@ public Optional<MaterializeSource> visitPhysicalCatalogRelation(
return Optional.empty();
}

@Override
public Optional<MaterializeSource> visitPhysicalTVFRelation(
PhysicalTVFRelation tvfRelation, ProbeContext context) {
if (checkTVFRelationTableSupportedType(tvfRelation) && tvfRelation.getOutput().contains(context.slot)
&& !tvfRelation.getOperativeSlots().contains(context.slot)) {
// lazy materialize slot must be a passive slot
if (context.slot.getOriginalColumn().isPresent()) {
return Optional.of(new MaterializeSource(tvfRelation, context.slot));
} else {
LOG.info("lazy materialize {} failed, because its column is empty", context.slot);
}
}

return Optional.empty();
}

@Override
public Optional<MaterializeSource> visitPhysicalLazyMaterialize(
PhysicalLazyMaterialize<? extends Plan> materialize, ProbeContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
package org.apache.doris.nereids.processor.post.materialize;

import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;

/**
the table and slot used to do lazy materialize
*/
public class MaterializeSource {
public final CatalogRelation relation;
public final Relation relation;
public final SlotReference baseSlot;

/*
constructor
*/
public MaterializeSource(CatalogRelation relation, SlotReference baseSlot) {
public MaterializeSource(Relation relation, SlotReference baseSlot) {
this.relation = relation;
this.baseSlot = baseSlot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,8 @@ private LogicalTVFRelation bindTableValuedFunction(MatchingContext<UnboundTVFRel
if (sqlCacheContext.isPresent()) {
sqlCacheContext.get().setCannotProcessExpression(true);
}
return new LogicalTVFRelation(unboundTVFRelation.getRelationId(), (TableValuedFunction) bindResult.first);
return new LogicalTVFRelation(unboundTVFRelation.getRelationId(),
(TableValuedFunction) bindResult.first, ImmutableList.of());
}

private void checkIfOutputAliasNameDuplicatedForGroupBy(Collection<Expression> expressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private Optional<LogicalPlan> handleMetaTable(TableIf table, UnboundRelation unb
Optional<TableValuedFunction> tvf = table.getSysTableFunction(
qualifiedTableName.get(0), qualifiedTableName.get(1), qualifiedTableName.get(2));
if (tvf.isPresent()) {
return Optional.of(new LogicalTVFRelation(unboundRelation.getRelationId(), tvf.get()));
return Optional.of(new LogicalTVFRelation(unboundRelation.getRelationId(), tvf.get(), ImmutableList.of()));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class LogicalTVFRelationToPhysicalTVFRelation extends OneImplementationRu
public Rule build() {
return logicalTVFRelation()
.then(relation -> new PhysicalTVFRelation(relation.getRelationId(),
relation.getFunction(), relation.getLogicalProperties()))
relation.getFunction(), relation.getOperativeSlots(), relation.getLogicalProperties()))
.toRule(RuleType.LOGICAL_TVF_RELATION_TO_PHYSICAL_TVF_RELATION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
Expand Down Expand Up @@ -174,6 +175,17 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation relation, DeriveC
return relation.withOperativeSlots(operandSlots.build());
}

@Override
public Plan visitLogicalTVFRelation(LogicalTVFRelation relation, DeriveContext context) {
ImmutableSet.Builder<Slot> operandSlots = ImmutableSet.builder();
for (Slot slot : relation.getOutput()) {
if (context.operativeSlotIds.contains(slot.getExprId().asInt())) {
operandSlots.add(slot);
}
}
return relation.withOperativeSlots(operandSlots.build());
}

/**
* DeriveContext
*/
Expand Down
Loading
Loading