Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TOlapTable;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
Expand Down Expand Up @@ -2184,4 +2187,43 @@ public boolean isDupKeysOrMergeOnWrite() {
|| (getKeysType() == KeysType.UNIQUE_KEYS
&& getEnableUniqueKeyMergeOnWrite());
}

/**
* generate two phase read fetch option from this olap table.
*
* @param selectedIndexId the index want to scan
*/
public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(this.storeRowColumn());
fetchOption.setUseTwoPhaseFetch(true);
fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo());
if (!this.storeRowColumn()) {
List<TColumn> columnsDesc = Lists.newArrayList();
getColumnDesc(selectedIndexId, columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
}
return fetchOption;
}

public void getColumnDesc(long selectedIndexId, List<TColumn> columnsDesc, List<String> keyColumnNames,
List<TPrimitiveType> keyColumnTypes) {
if (selectedIndexId != -1) {
for (Column col : this.getSchemaByIndexId(selectedIndexId, true)) {
TColumn tColumn = col.toThrift();
col.setIndexFlag(tColumn, this);
if (columnsDesc != null) {
columnsDesc.add(tColumn);
}
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
if (keyColumnNames != null) {
keyColumnNames.add(col.getName());
}
if (keyColumnTypes != null) {
keyColumnTypes.add(col.getDataType().toThrift());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
Expand Down Expand Up @@ -88,6 +90,12 @@ public Cost visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext
return CostV1.ofCpu(statistics.getRowCount());
}

@Override
public Cost visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan,
PlanContext context) {
return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
}

public Cost visitPhysicalSchemaScan(PhysicalSchemaScan physicalSchemaScan, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
return CostV1.ofCpu(statistics.getRowCount());
Expand Down Expand Up @@ -156,6 +164,12 @@ public Cost visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanContext con
childStatistics.getRowCount());
}

@Override
public Cost visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
PlanContext context) {
return visitPhysicalTopN(topN.getPhysicalTopN(), context);
}

@Override
public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.ConvertInnerOrCrossJoin;
import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult;
import org.apache.doris.nereids.rules.rewrite.EliminateAggregate;
import org.apache.doris.nereids.rules.rewrite.EliminateDedupJoinCondition;
import org.apache.doris.nereids.rules.rewrite.EliminateFilter;
Expand Down Expand Up @@ -282,6 +283,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
bottomUp(RuleSet.PUSH_DOWN_FILTERS),
custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, EliminateUnnecessaryProject::new)
),
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
// this rule batch must keep at the end of rewrite to do some plan check
topic("Final rewrite and check",
custom(RuleType.ENSURE_PROJECT_ON_TOP_JOIN, EnsureProjectOnTopJoin::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public List<PlanPostProcessor> getProcessors() {
builder.add(new Validator());
builder.add(new RecomputeLogicalPropertiesProcessor());
builder.add(new TopNScanOpt());
builder.add(new TwoPhaseReadOpt());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;

/**
* topN opt
* refer to:
* https://github.com/apache/doris/pull/15558
* https://github.com/apache/doris/pull/15663
* <a href="https://github.com/apache/doris/pull/15558">...</a>
* <a href="https://github.com/apache/doris/pull/15663">...</a>
*/

public class TopNScanOpt extends PlanPostProcessor {

@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
Expand All @@ -52,7 +53,7 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Cascade
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
}
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
// if firstKey's column is not present, it means the firstKey is not an original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
// a cast expr which is not from tbl1 and its column is not present.
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
Expand All @@ -68,14 +69,14 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Cascade
return topN;
}

PhysicalOlapScan olapScan;
OlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
if (!(child instanceof OlapScan)) {
return topN;
}
olapScan = (PhysicalOlapScan) child;
olapScan = (OlapScan) child;

if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
Expand All @@ -84,6 +85,12 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Cascade
return topN;
}

@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context));
}

private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
Expand Down Expand Up @@ -141,6 +142,12 @@ public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanC
return new PhysicalProperties(olapScan.getDistributionSpec());
}

@Override
public PhysicalProperties visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanContext context) {
return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
}

@Override
public PhysicalProperties visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanContext context) {
return PhysicalProperties.GATHER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
Expand Down Expand Up @@ -115,6 +116,14 @@ public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalR
return null;
}

@Override
public Void visitPhysicalDeferMaterializeResultSink(
PhysicalDeferMaterializeResultSink<? extends Plan> sink,
PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}

/* ********************************************************************************************
* Other Node, in lexicographical order
* ******************************************************************************************** */
Expand Down
Loading