diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index cc2450f990b79e..370780112736c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -56,8 +56,11 @@ import org.apache.doris.statistics.OlapAnalysisTask; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TCompressionType; +import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TOlapTable; +import org.apache.doris.thrift.TPrimitiveType; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -2184,4 +2187,43 @@ public boolean isDupKeysOrMergeOnWrite() { || (getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite()); } + + /** + * generate two phase read fetch option from this olap table. + * + * @param selectedIndexId the index want to scan + */ + public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) { + TFetchOption fetchOption = new TFetchOption(); + fetchOption.setFetchRowStore(this.storeRowColumn()); + fetchOption.setUseTwoPhaseFetch(true); + fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo()); + if (!this.storeRowColumn()) { + List columnsDesc = Lists.newArrayList(); + getColumnDesc(selectedIndexId, columnsDesc, null, null); + fetchOption.setColumnDesc(columnsDesc); + } + return fetchOption; + } + + public void getColumnDesc(long selectedIndexId, List columnsDesc, List keyColumnNames, + List keyColumnTypes) { + if (selectedIndexId != -1) { + for (Column col : this.getSchemaByIndexId(selectedIndexId, true)) { + TColumn tColumn = col.toThrift(); + col.setIndexFlag(tColumn, this); + if (columnsDesc != null) { + columnsDesc.add(tColumn); + } + if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) { + if (keyColumnNames != null) { + keyColumnNames.add(col.getName()); + } + if (keyColumnTypes != null) { + keyColumnTypes.add(col.getDataType().toThrift()); + } + } + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java index 713cbf0e009870..811b2e57fc8b27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java @@ -24,6 +24,8 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; @@ -88,6 +90,12 @@ public Cost visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext return CostV1.ofCpu(statistics.getRowCount()); } + @Override + public Cost visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, + PlanContext context) { + return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context); + } + public Cost visitPhysicalSchemaScan(PhysicalSchemaScan physicalSchemaScan, PlanContext context) { Statistics statistics = context.getStatisticsWithCheck(); return CostV1.ofCpu(statistics.getRowCount()); @@ -156,6 +164,12 @@ public Cost visitPhysicalTopN(PhysicalTopN topN, PlanContext con childStatistics.getRowCount()); } + @Override + public Cost visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + PlanContext context) { + return visitPhysicalTopN(topN.getPhysicalTopN(), context); + } + @Override public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, PlanContext context) { Statistics statistics = context.getStatisticsWithCheck(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 589919e47cf4e5..8e236622a89f10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -91,6 +91,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; @@ -164,9 +167,7 @@ import org.apache.doris.planner.external.jdbc.JdbcScanNode; import org.apache.doris.planner.external.paimon.PaimonScanNode; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.tablefunction.TableValuedFunctionIf; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; @@ -227,19 +228,10 @@ public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator */ public PlanFragment translatePlan(PhysicalPlan physicalPlan) { PlanFragment rootFragment = physicalPlan.accept(this, context); - - // TODO: why we need if? we should always set output expr? - // OlapSink? maybe OlapSink should not set output exprs by it self - if (rootFragment.getOutputExprs() == null) { - List outputExprs = Lists.newArrayList(); - physicalPlan.getOutput().stream().map(Slot::getExprId) - .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); - rootFragment.setOutputExprs(outputExprs); - } - for (PlanFragment fragment : context.getPlanFragments()) { - fragment.finalize(null); - } - setResultSinkFetchOptionIfNeed(); + List outputExprs = Lists.newArrayList(); + physicalPlan.getOutput().stream().map(Slot::getExprId) + .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); + rootFragment.setOutputExprs(outputExprs); Collections.reverse(context.getPlanFragments()); // TODO: maybe we need to trans nullable directly? and then we could remove call computeMemLayout context.getDescTable().computeMemLayout(); @@ -302,6 +294,9 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d } else { inputFragment.setDestination(exchangeNode); inputFragment.setOutputPartition(dataPartition); + DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId()); + streamSink.setOutputPartition(dataPartition); + inputFragment.setSink(streamSink); } context.addPlanFragment(parentFragment); @@ -315,13 +310,26 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d @Override public PlanFragment visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanTranslatorContext context) { - return physicalResultSink.child().accept(this, context); + PlanFragment planFragment = physicalResultSink.child().accept(this, context); + planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId())); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalDeferMaterializeResultSink( + PhysicalDeferMaterializeResultSink sink, + PlanTranslatorContext context) { + PlanFragment planFragment = visitPhysicalResultSink(sink.getPhysicalResultSink(), context); + TFetchOption fetchOption = sink.getOlapTable().generateTwoPhaseReadOption(sink.getSelectedIndexId()); + ((ResultSink) planFragment.getSink()).setFetchOption(fetchOption); + return planFragment; } @Override public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink olapTableSink, PlanTranslatorContext context) { PlanFragment rootFragment = olapTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); TupleDescriptor olapTuple = context.generateTupleDesc(); List targetTableColumns = olapTableSink.getTargetTable().getFullSchema(); @@ -332,26 +340,21 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink partialUpdateCols = new HashSet(); + HashSet partialUpdateCols = new HashSet<>(); for (Column col : olapTableSink.getCols()) { partialUpdateCols.add(col.getName()); } sink.setPartialUpdateInputColumns(true, partialUpdateCols); } - rootFragment.setSink(sink); - rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); - return rootFragment; } @@ -370,6 +373,7 @@ public PlanFragment visitPhysicalFileSink(PhysicalFileSink fileS .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); rootFragment.setOutputExprs(outputExprs); + // TODO: should not call legacy planner analyze in Nereids try { outFile.analyze(null, outputExprs, fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList())); @@ -506,24 +510,13 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla @Override public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) { - // deferred materialized slots used for topn opt. - Set deferredMaterializedExprIds = olapScan - .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS) - .map(s -> (Set) s) - .orElse(Collections.emptySet()); - List slots = olapScan.getOutput(); OlapTable olapTable = olapScan.getTable(); // generate real output tuple - TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, deferredMaterializedExprIds, context); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, context); // generate base index tuple because this fragment partitioned expr relay on slots of based index if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) { - generateTupleDesc(olapScan.getBaseOutputs(), olapTable, deferredMaterializedExprIds, context); - } - - // TODO: remove this, we should add this column in Nereids - if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) { - injectRowIdColumnSlot(tupleDescriptor); + generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context); } OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode"); @@ -581,6 +574,22 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla return planFragment; } + @Override + public PlanFragment visitPhysicalDeferMaterializeOlapScan( + PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) { + PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context); + OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot(); + TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId()); + for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) { + if (deferMaterializeOlapScan.getDeferMaterializeSlotIds() + .contains(context.findExprId(slotDescriptor.getId()))) { + slotDescriptor.setNeedMaterialize(false); + } + } + context.createSlotDesc(tupleDescriptor, deferMaterializeOlapScan.getColumnIdSlot()); + return planFragment; + } + @Override public PlanFragment visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanTranslatorContext context) { @@ -1715,6 +1724,26 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN topN, PlanTra return inputFragment; } + @Override + public PlanFragment visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + PlanTranslatorContext context) { + + PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), context); + if (planFragment.getPlanRoot() instanceof SortNode) { + SortNode sortNode = (SortNode) planFragment.getPlanRoot(); + sortNode.setUseTwoPhaseReadOpt(true); + sortNode.getSortInfo().setUseTwoPhaseRead(); + TupleDescriptor tupleDescriptor = sortNode.getSortInfo().getSortTupleDescriptor(); + for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) { + if (topN.getDeferMaterializeSlotIds() + .contains(context.findExprId(slotDescriptor.getId()))) { + slotDescriptor.setNeedMaterialize(false); + } + } + } + return planFragment; + } + @Override public PlanFragment visitPhysicalRepeat(PhysicalRepeat repeat, PlanTranslatorContext context) { PlanFragment inputPlanFragment = repeat.child(0).accept(this, context); @@ -1888,12 +1917,7 @@ private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN sort, PlanNode childNode, PlanTranslatorContext context) { - Set deferredMaterializedExprIds = sort - .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS) - .map(s -> (Set) s) - .orElse(Collections.emptySet()); - TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(), - null, deferredMaterializedExprIds, context); + TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(), null, context); List orderingExprs = Lists.newArrayList(); List ascOrders = Lists.newArrayList(); List nullsFirstParams = Lists.newArrayList(); @@ -1905,11 +1929,6 @@ private SortNode translateSortNode(AbstractPhysicalSort sort, Pl }); SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple); SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, sort instanceof PhysicalTopN); - if (sort.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) { - sortNode.setUseTwoPhaseReadOpt(true); - sortNode.getSortInfo().setUseTwoPhaseRead(); - injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor()); - } if (sort.getStats() != null) { sortNode.setCardinality((long) sort.getStats().getRowCount()); } @@ -1955,19 +1974,6 @@ private void addConjunctsToPlanNode(PhysicalFilter filter, updateLegacyPlanIdToPhysicalPlan(planNode, filter); } - private TupleDescriptor generateTupleDesc(List slotList, TableIf table, - Set deferredMaterializedExprIds, PlanTranslatorContext context) { - TupleDescriptor tupleDescriptor = context.generateTupleDesc(); - tupleDescriptor.setTable(table); - for (Slot slot : slotList) { - SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table); - if (deferredMaterializedExprIds.contains(slot.getExprId())) { - slotDescriptor.setNeedMaterialize(false); - } - } - return tupleDescriptor; - } - private TupleDescriptor generateTupleDesc(List slotList, TableIf table, PlanTranslatorContext context) { TupleDescriptor tupleDescriptor = context.generateTupleDesc(); tupleDescriptor.setTable(table); @@ -2153,71 +2159,19 @@ private void injectRowIdColumnSlot(TupleDescriptor tupleDesc) { slotDesc.setIsMaterialized(true); } - /** - * We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n] - * in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes - * in the second phase, we have n rows, we do a fetch rpc to get all rowids data for the n rows - * and reconstruct the final block - */ - private void setResultSinkFetchOptionIfNeed() { - boolean needFetch = false; - // Only single olap table should be fetched - OlapTable fetchOlapTable = null; - OlapScanNode scanNode = null; - for (PlanFragment fragment : context.getPlanFragments()) { - PlanNode node = fragment.getPlanRoot(); - PlanNode parent = null; - // OlapScanNode is the last node. - // So, just get the last two node and check if they are SortNode and OlapScan. - while (node.getChildren().size() != 0) { - parent = node; - node = node.getChildren().get(0); - } - - // case1: general topn optimized query - if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) { - SortNode sortNode = (SortNode) parent; - scanNode = (OlapScanNode) node; - if (sortNode.getUseTwoPhaseReadOpt()) { - needFetch = true; - fetchOlapTable = scanNode.getOlapTable(); - break; - } - } - } - for (PlanFragment fragment : context.getPlanFragments()) { - if (needFetch && fragment.getSink() instanceof ResultSink) { - TFetchOption fetchOption = new TFetchOption(); - fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn()); - fetchOption.setUseTwoPhaseFetch(true); - fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo()); - if (!fetchOlapTable.storeRowColumn()) { - // Set column desc for each column - List columnsDesc = new ArrayList<>(); - scanNode.getColumnDesc(columnsDesc, null, null); - fetchOption.setColumnDesc(columnsDesc); - } - ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); - break; - } - } - } - /** * topN opt: using storage data ordering to accelerate topn operation. * refer pr: optimize topn query if order by columns is prefix of sort keys of table (#10694) */ private boolean checkPushSort(SortNode sortNode, OlapTable olapTable) { - // Ensure limit is less then threshold + // Ensure limit is less than threshold if (sortNode.getLimit() <= 0 || sortNode.getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) { return false; } - // Ensure all isAscOrder is same, ande length != 0. - // Can't be zorder. - if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1 - || olapTable.isZOrderSort()) { + // Ensure all isAscOrder is same, ande length != 0. Can't be z-order. + if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1 || olapTable.isZOrderSort()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 1bba83c5926ee0..ded73cbf789de1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.rules.rewrite.ColumnPruning; import org.apache.doris.nereids.rules.rewrite.ConvertInnerOrCrossJoin; import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite; +import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult; import org.apache.doris.nereids.rules.rewrite.EliminateAggregate; import org.apache.doris.nereids.rules.rewrite.EliminateDedupJoinCondition; import org.apache.doris.nereids.rules.rewrite.EliminateFilter; @@ -282,6 +283,9 @@ public class Rewriter extends AbstractBatchJobExecutor { bottomUp(RuleSet.PUSH_DOWN_FILTERS), custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, EliminateUnnecessaryProject::new) ), + topic("topn optimize", + topDown(new DeferMaterializeTopNResult()) + ), // this rule batch must keep at the end of rewrite to do some plan check topic("Final rewrite and check", custom(RuleType.ENSURE_PROJECT_ON_TOP_JOIN, EnsureProjectOnTopJoin::new), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 40891e82a42957..1c927fb89dc058 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -71,7 +71,6 @@ public List getProcessors() { builder.add(new Validator()); builder.add(new RecomputeLogicalPropertiesProcessor()); builder.add(new TopNScanOpt()); - builder.add(new TwoPhaseReadOpt()); return builder.build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index a938a231ed3827..0c5d8d8ce6e7e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -22,22 +22,23 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.algebra.Filter; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.algebra.Project; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.qe.ConnectContext; /** * topN opt * refer to: - * https://github.com/apache/doris/pull/15558 - * https://github.com/apache/doris/pull/15663 + * ... + * ... */ public class TopNScanOpt extends PlanPostProcessor { @Override - public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { + public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { topN.child().accept(this, ctx); Plan child = topN.child(); if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { @@ -52,7 +53,7 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, Cascade if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) { return topN; } - // if firstKey's column is not present, it means the firstKey is not a original column from scan node + // if firstKey's column is not present, it means the firstKey is not an original column from scan node // for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is // a cast expr which is not from tbl1 and its column is not present. // On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1 @@ -68,14 +69,14 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, Cascade return topN; } - PhysicalOlapScan olapScan; + OlapScan olapScan; while (child instanceof Project || child instanceof Filter) { child = child.child(0); } - if (!(child instanceof PhysicalOlapScan)) { + if (!(child instanceof OlapScan)) { return topN; } - olapScan = (PhysicalOlapScan) child; + olapScan = (OlapScan) child; if (olapScan.getTable().isDupKeysOrMergeOnWrite()) { topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); @@ -84,6 +85,12 @@ public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, Cascade return topN; } + @Override + public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + CascadesContext context) { + return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context)); + } + private long getTopNOptLimitThreshold() { if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java deleted file mode 100644 index 93dd579e7084a7..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.processor.post; - -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.properties.OrderKey; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.ExprId; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.SortPhase; -import org.apache.doris.nereids.trees.plans.algebra.Filter; -import org.apache.doris.nereids.trees.plans.algebra.Project; -import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; -import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; -import org.apache.doris.qe.ConnectContext; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * two phase read opt - * refer to: - * https://github.com/apache/doris/pull/15642 - * https://github.com/apache/doris/pull/16460 - * https://github.com/apache/doris/pull/16848 - */ - -public class TwoPhaseReadOpt extends PlanPostProcessor { - - @Override - public Plan processRoot(Plan plan, CascadesContext ctx) { - if (plan instanceof PhysicalTopN) { - PhysicalTopN physicalTopN = (PhysicalTopN) plan; - if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) { - return plan.accept(this, ctx); - } - } - return plan; - } - - @Override - public PhysicalTopN visitPhysicalTopN(PhysicalTopN mergeTopN, CascadesContext ctx) { - mergeTopN.child().accept(this, ctx); - if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) { - return mergeTopN; - } - PhysicalDistribute distribute = (PhysicalDistribute) mergeTopN.child(); - if (!(distribute.child() instanceof PhysicalTopN)) { - return mergeTopN; - } - PhysicalTopN localTopN = (PhysicalTopN) distribute.child(); - - if (localTopN.getOrderKeys().isEmpty()) { - return mergeTopN; - } - - // topn opt - long topNOptLimitThreshold = getTopNOptLimitThreshold(); - if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) { - return mergeTopN; - } - if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) { - return mergeTopN; - } - - PhysicalOlapScan olapScan; - PhysicalProject project = null; - PhysicalFilter filter = null; - Plan child = localTopN.child(); - while (child instanceof Project || child instanceof Filter) { - if (child instanceof Filter) { - filter = (PhysicalFilter) child; - } - if (child instanceof Project) { - project = (PhysicalProject) child; - // TODO: remove this after fix two phase read on project core - return mergeTopN; - } - child = child.child(0); - } - if (!(child instanceof PhysicalOlapScan)) { - return mergeTopN; - } - olapScan = (PhysicalOlapScan) child; - - // all order key must column from table - if (!olapScan.getTable().getEnableLightSchemaChange()) { - return mergeTopN; - } - - Map projectRevertedMap = Maps.newHashMap(); - if (project != null) { - for (Expression e : project.getProjects()) { - if (e.isSlot()) { - Slot slot = (Slot) e; - projectRevertedMap.put(slot.getExprId(), slot.getExprId()); - } else if (e instanceof Alias) { - Alias alias = (Alias) e; - if (alias.child().isSlot()) { - Slot slot = (Slot) alias.child(); - projectRevertedMap.put(alias.getExprId(), slot.getExprId()); - } - } - } - } - Set deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet()); - if (filter != null) { - filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())); - } - localTopN.getOrderKeys().stream() - .map(OrderKey::getExpr) - .map(Slot.class::cast) - .map(NamedExpression::getExprId) - .map(projectRevertedMap::get) - .filter(Objects::nonNull) - .forEach(deferredMaterializedExprIds::remove); - localTopN.getOrderKeys().stream() - .map(OrderKey::getExpr) - .map(Slot.class::cast) - .map(NamedExpression::getExprId) - .forEach(deferredMaterializedExprIds::remove); - olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds); - localTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds); - mergeTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds); - - return mergeTopN; - } - - private long getTopNOptLimitThreshold() { - if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { - if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) { - return -1; - } - return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; - } - return -1; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 2ebc3acc5fe946..97c197faa133b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; @@ -141,6 +142,12 @@ public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanC return new PhysicalProperties(olapScan.getDistributionSpec()); } + @Override + public PhysicalProperties visitPhysicalDeferMaterializeOlapScan( + PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanContext context) { + return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context); + } + @Override public PhysicalProperties visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanContext context) { return PhysicalProperties.GATHER; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 8fe652fd3f95de..55625b194baac8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; @@ -115,6 +116,14 @@ public Void visitPhysicalResultSink(PhysicalResultSink physicalR return null; } + @Override + public Void visitPhysicalDeferMaterializeResultSink( + PhysicalDeferMaterializeResultSink sink, + PlanContext context) { + addRequestPropertyToChildren(PhysicalProperties.GATHER); + return null; + } + /* ******************************************************************************************** * Other Node, in lexicographical order * ******************************************************************************************** */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index 1d130d4982bd86..8304fb801a843c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -45,6 +45,9 @@ import org.apache.doris.nereids.rules.implementation.LogicalCTEAnchorToPhysicalCTEAnchor; import org.apache.doris.nereids.rules.implementation.LogicalCTEConsumerToPhysicalCTEConsumer; import org.apache.doris.nereids.rules.implementation.LogicalCTEProducerToPhysicalCTEProducer; +import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink; +import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN; import org.apache.doris.nereids.rules.implementation.LogicalEmptyRelationToPhysicalEmptyRelation; import org.apache.doris.nereids.rules.implementation.LogicalEsScanToPhysicalEsScan; import org.apache.doris.nereids.rules.implementation.LogicalExceptToPhysicalExcept; @@ -143,6 +146,7 @@ public class RuleSet { .add(new LogicalJoinToHashJoin()) .add(new LogicalJoinToNestedLoopJoin()) .add(new LogicalOlapScanToPhysicalOlapScan()) + .add(new LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan()) .add(new LogicalSchemaScanToPhysicalSchemaScan()) .add(new LogicalFileScanToPhysicalFileScan()) .add(new LogicalJdbcScanToPhysicalJdbcScan()) @@ -152,6 +156,7 @@ public class RuleSet { .add(new LogicalWindowToPhysicalWindow()) .add(new LogicalSortToPhysicalQuickSort()) .add(new LogicalTopNToPhysicalTopN()) + .add(new LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN()) .add(new LogicalPartitionTopNToPhysicalPartitionTopN()) .add(new LogicalAssertNumRowsToPhysicalAssertNumRows()) .add(new LogicalOneRowRelationToPhysicalOneRowRelation()) @@ -165,6 +170,7 @@ public class RuleSet { .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) + .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) .build(); public static final List ZIG_ZAG_TREE_JOIN_REORDER = planRuleFactories() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 730ad106800132..254bf05e2db4c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -243,6 +243,9 @@ public enum RuleType { COLLECT_PROJECT_ABOVE_FILTER_CONSUMER(RuleTypeClass.REWRITE), REWRITE_SENTINEL(RuleTypeClass.REWRITE), + // topn opts + DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE), + // exploration rules TEST_EXPLORATION(RuleTypeClass.EXPLORATION), OR_EXPANSION(RuleTypeClass.EXPLORATION), @@ -290,16 +293,19 @@ public enum RuleType { LOGICAL_CTE_ANCHOR_TO_PHYSICAL_CTE_ANCHOR_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_EMPTY_RELATION_TO_PHYSICAL_EMPTY_RELATION_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java new file mode 100644 index 00000000000000..8d30f32dbc6dcc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; + +import java.util.Optional; + +/** + * implement defer materialize olap scan from logical to physical + */ +public class LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalDeferMaterializeOlapScan().thenApply(ctx -> { + LogicalDeferMaterializeOlapScan logicalDeferOlapScan = ctx.root; + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan().build() + .transform(logicalDeferOlapScan.getLogicalOlapScan(), ctx.cascadesContext).get(0); + return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, + logicalDeferOlapScan.getDeferMaterializeSlotIds(), + logicalDeferOlapScan.getColumnIdSlot(), + Optional.empty(), + logicalDeferOlapScan.getLogicalProperties()); + }).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java new file mode 100644 index 00000000000000..0c02841e2b8384 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; + +import java.util.Optional; + +/** + * implement defer materialize result sink from logical to physical + */ +public class LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink + extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalDeferMaterializeResultSink().thenApply(ctx -> { + LogicalDeferMaterializeResultSink sink = ctx.root; + PhysicalResultSink physicalResultSink + = (PhysicalResultSink) new LogicalResultSinkToPhysicalResultSink() + .build() + .transform(sink.getLogicalResultSink(), ctx.cascadesContext) + .get(0); + return new PhysicalDeferMaterializeResultSink<>( + physicalResultSink, sink.getOlapTable(), sink.getSelectedIndexId(), + Optional.empty(), sink.getLogicalProperties(), sink.child()); + }).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java new file mode 100644 index 00000000000000..9ad6b73d1c85cf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; +import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; + +import java.util.Optional; + +/** + * implement defer materialize top n from logical to physical + */ +public class LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalDeferMaterializeTopN().thenApply(ctx -> { + LogicalDeferMaterializeTopN topN = ctx.root; + PhysicalTopN physicalTopN = (PhysicalTopN) new LogicalTopNToPhysicalTopN() + .build() + .transform(topN.getLogicalTopN(), ctx.cascadesContext) + .get(0); + return wrap(physicalTopN, topN, wrap((PhysicalTopN) physicalTopN.child(), topN, + ((PhysicalTopN) physicalTopN.child()).child())); + + }).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE); + } + + private PhysicalDeferMaterializeTopN wrap(PhysicalTopN physicalTopN, + LogicalDeferMaterializeTopN logicalWrapped, Plan child) { + return new PhysicalDeferMaterializeTopN<>(physicalTopN, + logicalWrapped.getDeferMaterializeSlotIds(), logicalWrapped.getColumnIdSlot(), + Optional.empty(), logicalWrapped.getLogicalProperties(), child); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java new file mode 100644 index 00000000000000..4cd41bd509afde --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * rewrite simple top n query to defer materialize slot not use for sort or predicate + */ +public class DeferMaterializeTopNResult implements RewriteRuleFactory { + + @Override + public List buildRules() { + return ImmutableList.of( + RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( + logicalResultSink(logicalTopN(logicalOlapScan())) + .when(r -> r.child().getLimit() < getTopNOptLimitThreshold()) + .whenNot(r -> r.child().getOrderKeys().isEmpty()) + .when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + .when(r -> r.child().child().getTable().getEnableLightSchemaChange()) + .then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child())) + ), + RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( + logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan()))) + .when(r -> r.child().getLimit() < getTopNOptLimitThreshold()) + .whenNot(r -> r.child().getOrderKeys().isEmpty()) + .when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + .when(r -> r.child().child().child().getTable().getEnableLightSchemaChange()) + .then(r -> { + LogicalFilter filter = r.child().child(); + return deferMaterialize(r, r.child(), Optional.of(filter), filter.child()); + }) + ) + ); + } + + private Plan deferMaterialize(LogicalResultSink logicalResultSink, + LogicalTopN logicalTopN, Optional> logicalFilter, + LogicalOlapScan logicalOlapScan) { + Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column"); + SlotReference columnId = SlotReference.fromColumn(rowId, logicalOlapScan.getQualifier()); + Set deferredMaterializedExprIds = Sets.newHashSet(logicalOlapScan.getOutputExprIdSet()); + logicalFilter.ifPresent(filter -> filter.getConjuncts() + .forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()))); + logicalTopN.getOrderKeys().stream() + .map(OrderKey::getExpr) + .map(Slot.class::cast) + .map(NamedExpression::getExprId) + .filter(Objects::nonNull) + .forEach(deferredMaterializedExprIds::remove); + LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan( + logicalOlapScan, deferredMaterializedExprIds, columnId); + Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan); + root = new LogicalDeferMaterializeTopN<>((LogicalTopN) logicalTopN.withChildren(root), + deferredMaterializedExprIds, columnId); + root = logicalResultSink.withChildren(root); + return new LogicalDeferMaterializeResultSink<>((LogicalResultSink) root, + logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId()); + } + + private long getTopNOptLimitThreshold() { + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) { + return -1; + } + return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; + } + return -1; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index 832914360b9aad..45aeae54fde483 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -57,6 +57,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; @@ -83,6 +85,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; @@ -284,6 +288,12 @@ public Statistics visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) { return computeCatalogRelation(olapScan); } + @Override + public Statistics visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan deferMaterializeOlapScan, + Void context) { + return computeCatalogRelation(deferMaterializeOlapScan.getLogicalOlapScan()); + } + @Override public Statistics visitLogicalSchemaScan(LogicalSchemaScan schemaScan, Void context) { return computeCatalogRelation(schemaScan); @@ -327,6 +337,11 @@ public Statistics visitLogicalTopN(LogicalTopN topN, Void contex return computeTopN(topN); } + @Override + public Statistics visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN topN, Void context) { + return computeTopN(topN.getLogicalTopN()); + } + @Override public Statistics visitLogicalPartitionTopN(LogicalPartitionTopN partitionTopN, Void context) { return computePartitionTopN(partitionTopN); @@ -411,6 +426,12 @@ public Statistics visitPhysicalOlapScan(PhysicalOlapScan olapScan, Void context) return computeCatalogRelation(olapScan); } + @Override + public Statistics visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, + Void context) { + return computeCatalogRelation(deferMaterializeOlapScan.getPhysicalOlapScan()); + } + @Override public Statistics visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, Void context) { return computeCatalogRelation(schemaScan); @@ -452,6 +473,12 @@ public Statistics visitPhysicalTopN(PhysicalTopN topN, Void cont return computeTopN(topN); } + @Override + public Statistics visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + Void context) { + return computeTopN(topN.getPhysicalTopN()); + } + @Override public Statistics visitPhysicalHashJoin( PhysicalHashJoin hashJoin, Void context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java index 400d27e71aa9f2..5727279ccfc673 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.OrderExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.trees.expressions.functions.Function; @@ -35,6 +36,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; @@ -171,6 +174,19 @@ public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, DeepCopierContext con return newOlapScan; } + @Override + public Plan visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan deferMaterializeOlapScan, + DeepCopierContext context) { + LogicalOlapScan newScan = (LogicalOlapScan) visitLogicalOlapScan( + deferMaterializeOlapScan.getLogicalOlapScan(), context); + Set newSlotIds = deferMaterializeOlapScan.getDeferMaterializeSlotIds().stream() + .map(context.exprIdReplaceMap::get) + .collect(ImmutableSet.toImmutableSet()); + SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE + .deepCopy(deferMaterializeOlapScan.getColumnIdSlot(), context); + return new LogicalDeferMaterializeOlapScan(newScan, newSlotIds, newRowId); + } + @Override public Plan visitLogicalSchemaScan(LogicalSchemaScan schemaScan, DeepCopierContext context) { if (context.getRelationReplaceMap().containsKey(schemaScan.getRelationId())) { @@ -263,6 +279,19 @@ public Plan visitLogicalTopN(LogicalTopN topN, DeepCopierContext return new LogicalTopN<>(orderKeys, topN.getLimit(), topN.getOffset(), child); } + @Override + public Plan visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN topN, + DeepCopierContext context) { + LogicalTopN newTopN + = (LogicalTopN) visitLogicalTopN(topN.getLogicalTopN(), context); + Set newSlotIds = topN.getDeferMaterializeSlotIds().stream() + .map(context.exprIdReplaceMap::get) + .collect(ImmutableSet.toImmutableSet()); + SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE + .deepCopy(topN.getColumnIdSlot(), context); + return new LogicalDeferMaterializeTopN<>(newTopN, newSlotIds, newRowId); + } + @Override public Plan visitLogicalPartitionTopN(LogicalPartitionTopN partitionTopN, DeepCopierContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java new file mode 100644 index 00000000000000..a49e37d8df40a0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * use for defer materialize top n + */ +public class LogicalDeferMaterializeOlapScan extends LogicalCatalogRelation implements OlapScan { + + private final LogicalOlapScan logicalOlapScan; + + /////////////////////////////////////////////////////////////////////////// + // Members for defer materialize for top-n opt. + /////////////////////////////////////////////////////////////////////////// + private final Set deferMaterializeSlotIds; + private final SlotReference columnIdSlot; + + public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan, + Set deferMaterializeSlotIds, SlotReference columnIdSlot) { + this(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + logicalOlapScan.getGroupExpression(), Optional.empty()); + } + + /** + * constructor + */ + public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, Optional logicalProperties) { + super(logicalOlapScan.getRelationId(), logicalOlapScan.getType(), logicalOlapScan.getTable(), + logicalOlapScan.getQualifier(), groupExpression, logicalProperties); + this.logicalOlapScan = Objects.requireNonNull(logicalOlapScan, "logicalOlapScan can not be null"); + this.deferMaterializeSlotIds = ImmutableSet.copyOf(Objects.requireNonNull(deferMaterializeSlotIds, + "deferMaterializeSlotIds can not be null")); + this.columnIdSlot = Objects.requireNonNull(columnIdSlot, "columnIdSlot can not be null"); + } + + public LogicalOlapScan getLogicalOlapScan() { + return logicalOlapScan; + } + + public Set getDeferMaterializeSlotIds() { + return deferMaterializeSlotIds; + } + + public SlotReference getColumnIdSlot() { + return columnIdSlot; + } + + @Override + public OlapTable getTable() { + return logicalOlapScan.getTable(); + } + + @Override + public long getSelectedIndexId() { + return logicalOlapScan.getSelectedIndexId(); + } + + @Override + public List getSelectedPartitionIds() { + return logicalOlapScan.getSelectedPartitionIds(); + } + + @Override + public List getSelectedTabletIds() { + return logicalOlapScan.getSelectedPartitionIds(); + } + + @Override + public List computeOutput() { + return ImmutableList.builder() + .addAll(logicalOlapScan.getOutput()) + .add(columnIdSlot) + .build(); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalDeferMaterializeOlapScan(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + groupExpression, Optional.of(getLogicalProperties())); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.isEmpty(), "LogicalDeferMaterializeOlapScan should have no child"); + return new LogicalDeferMaterializeOlapScan(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + groupExpression, logicalProperties); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.isEmpty(), "LogicalDeferMaterializeOlapScan should have no child"); + return this; + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalDeferMaterializeOlapScan(this, context); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalDeferMaterializeOlapScan that = (LogicalDeferMaterializeOlapScan) o; + return Objects.equals(logicalOlapScan, that.logicalOlapScan) && Objects.equals( + deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, + that.columnIdSlot); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), logicalOlapScan, deferMaterializeSlotIds, columnIdSlot); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalDeferMaterializeOlapScan[" + id.asInt() + "]", + "olapScan", logicalOlapScan, + "deferMaterializeSlotIds", deferMaterializeSlotIds, + "columnIdSlot", columnIdSlot + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java new file mode 100644 index 00000000000000..48ea0720455114 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * use for defer materialize top n + */ +public class LogicalDeferMaterializeResultSink + extends LogicalSink implements Sink { + + private final LogicalResultSink logicalResultSink; + private final OlapTable olapTable; + private final long selectedIndexId; + + public LogicalDeferMaterializeResultSink(LogicalResultSink logicalResultSink, + OlapTable olapTable, long selectedIndexId) { + this(logicalResultSink, olapTable, selectedIndexId, + Optional.empty(), Optional.empty(), logicalResultSink.child()); + } + + public LogicalDeferMaterializeResultSink(LogicalResultSink logicalResultSink, + OlapTable olapTable, long selectedIndexId, + Optional groupExpression, Optional logicalProperties, + CHILD_TYPE child) { + super(logicalResultSink.getType(), groupExpression, logicalProperties, child); + this.logicalResultSink = logicalResultSink; + this.olapTable = olapTable; + this.selectedIndexId = selectedIndexId; + } + + public LogicalResultSink getLogicalResultSink() { + return logicalResultSink; + } + + public OlapTable getOlapTable() { + return olapTable; + } + + public long getSelectedIndexId() { + return selectedIndexId; + } + + @Override + public LogicalDeferMaterializeResultSink withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "LogicalDeferMaterializeResultSink only accepts one child"); + return new LogicalDeferMaterializeResultSink<>( + logicalResultSink.withChildren(ImmutableList.of(children.get(0))), + olapTable, selectedIndexId, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalDeferMaterializeResultSink(this, context); + } + + @Override + public List getExpressions() { + return logicalResultSink.getExpressions(); + } + + @Override + public LogicalDeferMaterializeResultSink withGroupExpression(Optional groupExpression) { + return new LogicalDeferMaterializeResultSink<>(logicalResultSink, olapTable, selectedIndexId, + groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public LogicalDeferMaterializeResultSink withGroupExprLogicalPropChildren( + Optional groupExpression, + Optional logicalProperties, + List children) { + Preconditions.checkArgument(children.size() == 1, + "LogicalDeferMaterializeResultSink only accepts one child"); + return new LogicalDeferMaterializeResultSink<>( + logicalResultSink.withChildren(ImmutableList.of(children.get(0))), + olapTable, selectedIndexId, groupExpression, logicalProperties, children.get(0)); + } + + @Override + public List computeOutput() { + return child().getOutput(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalDeferMaterializeResultSink that = (LogicalDeferMaterializeResultSink) o; + return selectedIndexId == that.selectedIndexId && Objects.equals(logicalResultSink, + that.logicalResultSink) && Objects.equals(olapTable, that.olapTable); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), logicalResultSink, olapTable, selectedIndexId); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalDeferMaterializeResultSink[" + id.asInt() + "]", + "logicalResultSink", logicalResultSink, + "olapTable", olapTable, + "selectedIndexId", selectedIndexId + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java new file mode 100644 index 00000000000000..b775cb1db2f79a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.TopN; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * use for defer materialize top n + */ +public class LogicalDeferMaterializeTopN extends LogicalUnary implements TopN { + + private final LogicalTopN logicalTopN; + + /////////////////////////////////////////////////////////////////////////// + // Members for defer materialize for top-n opt. + /////////////////////////////////////////////////////////////////////////// + private final Set deferMaterializeSlotIds; + private final SlotReference columnIdSlot; + + public LogicalDeferMaterializeTopN(LogicalTopN logicalTopN, + Set deferMaterializeSlotIds, SlotReference columnIdSlot) { + super(PlanType.LOGICAL_TOP_N, logicalTopN.getGroupExpression(), + Optional.of(logicalTopN.getLogicalProperties()), logicalTopN.child()); + this.logicalTopN = logicalTopN; + this.deferMaterializeSlotIds = deferMaterializeSlotIds; + this.columnIdSlot = columnIdSlot; + } + + public LogicalDeferMaterializeTopN(LogicalTopN logicalTopN, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, child); + this.logicalTopN = logicalTopN; + this.deferMaterializeSlotIds = deferMaterializeSlotIds; + this.columnIdSlot = columnIdSlot; + } + + public LogicalTopN getLogicalTopN() { + return logicalTopN; + } + + public Set getDeferMaterializeSlotIds() { + return deferMaterializeSlotIds; + } + + public SlotReference getColumnIdSlot() { + return columnIdSlot; + } + + @Override + public List getOrderKeys() { + return logicalTopN.getOrderKeys(); + } + + @Override + public long getOffset() { + return logicalTopN.getOffset(); + } + + @Override + public long getLimit() { + return logicalTopN.getLimit(); + } + + @Override + public List getExpressions() { + return ImmutableList.builder() + .addAll(logicalTopN.getExpressions()) + .add(columnIdSlot).build(); + } + + @Override + public List computeOutput() { + return logicalTopN.getOutput().stream() + .filter(s -> !(s.getExprId().equals(columnIdSlot.getExprId()))) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalDeferMaterializeTopN(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalDeferMaterializeTopN<>(logicalTopN, deferMaterializeSlotIds, columnIdSlot, + groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.size() == 1, + "LogicalDeferMaterializeTopN should have 1 child, but input is %s", children.size()); + return new LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))), + deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties, children.get(0)); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "LogicalDeferMaterializeTopN should have 1 child, but input is %s", children.size()); + return new LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))), + deferMaterializeSlotIds, columnIdSlot, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalDeferMaterializeTopN that = (LogicalDeferMaterializeTopN) o; + return Objects.equals(logicalTopN, that.logicalTopN) && Objects.equals(deferMaterializeSlotIds, + that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, that.columnIdSlot); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), logicalTopN, deferMaterializeSlotIds, columnIdSlot); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalDeferMaterializeTopN[" + id.asInt() + "]", + "logicalTopN", logicalTopN, + "deferMaterializeSlotIds", deferMaterializeSlotIds, + "columnIdSlot", columnIdSlot + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index 6d58ba1f71ce7e..60458eb2a243fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -141,7 +141,6 @@ public LogicalOlapScan(RelationId id, Table table, List qualifier, List selectedTabletIds, long selectedIndexId, boolean indexSelected, PreAggStatus preAggStatus, List specifiedPartitions, List hints, Map cacheSlotWithSlotName) { - super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier, groupExpression, logicalProperties); Preconditions.checkArgument(selectedPartitionIds != null, "selectedPartitionIds can not be null"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java index c21422e858a8c2..eb7b2556d367a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java @@ -59,7 +59,7 @@ public List getOutputExprs() { } @Override - public Plan withChildren(List children) { + public LogicalResultSink withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalResultSink's children size must be 1, but real is %s", children.size()); return new LogicalResultSink<>(outputExprs, children.get(0)); @@ -76,14 +76,14 @@ public List getExpressions() { } @Override - public Plan withGroupExpression(Optional groupExpression) { + public LogicalResultSink withGroupExpression(Optional groupExpression) { return new LogicalResultSink<>(outputExprs, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + public LogicalResultSink withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - Preconditions.checkArgument(children.size() == 1, "UnboundResultSink only accepts one child"); + Preconditions.checkArgument(children.size() == 1, "LogicalResultSink only accepts one child"); return new LogicalResultSink<>(outputExprs, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java index 348a70917637cc..80de9d6215aa19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java @@ -64,14 +64,17 @@ public List computeOutput() { return child().getOutput(); } + @Override public List getOrderKeys() { return orderKeys; } + @Override public long getOffset() { return offset; } + @Override public long getLimit() { return limit; } @@ -93,7 +96,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - LogicalTopN that = (LogicalTopN) o; + LogicalTopN that = (LogicalTopN) o; return this.offset == that.offset && this.limit == that.limit && Objects.equals(this.orderKeys, that.orderKeys); } @@ -104,7 +107,7 @@ public int hashCode() { @Override public R accept(PlanVisitor visitor, C context) { - return visitor.visitLogicalTopN((LogicalTopN) this, context); + return visitor.visitLogicalTopN(this, context); } @Override @@ -121,7 +124,8 @@ public LogicalTopN withOrderKeys(List orderKeys) { @Override public LogicalTopN withChildren(List children) { - Preconditions.checkArgument(children.size() == 1); + Preconditions.checkArgument(children.size() == 1, + "LogicalTopN should have 1 child, but input is %s", children.size()); return new LogicalTopN<>(orderKeys, limit, offset, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java new file mode 100644 index 00000000000000..f82bd6dbec5342 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * use for defer materialize top n + */ +public class PhysicalDeferMaterializeOlapScan extends PhysicalCatalogRelation implements OlapScan { + + private final PhysicalOlapScan physicalOlapScan; + + /////////////////////////////////////////////////////////////////////////// + // Members for defer materialize for top-n opt. + /////////////////////////////////////////////////////////////////////////// + private final Set deferMaterializeSlotIds; + private final SlotReference columnIdSlot; + + public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, LogicalProperties logicalProperties) { + this(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties, null, null); + } + + /** + * constructor + */ + public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics) { + super(physicalOlapScan.getRelationId(), physicalOlapScan.getType(), + physicalOlapScan.getTable(), physicalOlapScan.getQualifier(), + groupExpression, logicalProperties, physicalProperties, statistics); + this.physicalOlapScan = physicalOlapScan; + this.deferMaterializeSlotIds = deferMaterializeSlotIds; + this.columnIdSlot = columnIdSlot; + } + + public PhysicalOlapScan getPhysicalOlapScan() { + return physicalOlapScan; + } + + public Set getDeferMaterializeSlotIds() { + return deferMaterializeSlotIds; + } + + public SlotReference getColumnIdSlot() { + return columnIdSlot; + } + + @Override + public OlapTable getTable() { + return physicalOlapScan.getTable(); + } + + @Override + public long getSelectedIndexId() { + return physicalOlapScan.getSelectedIndexId(); + } + + @Override + public List getSelectedPartitionIds() { + return physicalOlapScan.getSelectedPartitionIds(); + } + + @Override + public List getSelectedTabletIds() { + return physicalOlapScan.getSelectedTabletIds(); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalDeferMaterializeOlapScan(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + groupExpression, getLogicalProperties(), physicalProperties, statistics); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + groupExpression, logicalProperties.get(), physicalProperties, statistics); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, + groupExpression, getLogicalProperties(), physicalProperties, statistics); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalDeferMaterializeOlapScan that = (PhysicalDeferMaterializeOlapScan) o; + return Objects.equals(physicalOlapScan, that.physicalOlapScan) && Objects.equals( + deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, + that.columnIdSlot); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), physicalOlapScan, deferMaterializeSlotIds, columnIdSlot); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalDeferMaterializeOlapScan[" + id.asInt() + "]", + "physicalOlapScan", physicalOlapScan, + "deferMaterializeSlotIds", deferMaterializeSlotIds, + "columnIdSlot", columnIdSlot + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java new file mode 100644 index 00000000000000..ee0713300655c4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.jetbrains.annotations.Nullable; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * use for defer materialize top n + */ +public class PhysicalDeferMaterializeResultSink + extends PhysicalSink implements Sink { + + private final PhysicalResultSink physicalResultSink; + private final OlapTable olapTable; + private final long selectedIndexId; + + public PhysicalDeferMaterializeResultSink(PhysicalResultSink physicalResultSink, + OlapTable olapTable, long selectedIndexId, + Optional groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(physicalResultSink, olapTable, selectedIndexId, + groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); + } + + public PhysicalDeferMaterializeResultSink(PhysicalResultSink physicalResultSink, + OlapTable olapTable, long selectedIndexId, + Optional groupExpression, LogicalProperties logicalProperties, + @Nullable PhysicalProperties physicalProperties, Statistics statistics, + CHILD_TYPE child) { + super(physicalResultSink.getType(), groupExpression, logicalProperties, physicalProperties, statistics, child); + this.physicalResultSink = physicalResultSink; + this.olapTable = olapTable; + this.selectedIndexId = selectedIndexId; + } + + public PhysicalResultSink getPhysicalResultSink() { + return physicalResultSink; + } + + public OlapTable getOlapTable() { + return olapTable; + } + + public long getSelectedIndexId() { + return selectedIndexId; + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "PhysicalDeferMaterializeResultSink's children size must be 1, but real is %s", children.size()); + return new PhysicalDeferMaterializeResultSink<>( + physicalResultSink.withChildren(ImmutableList.of(children.get(0))), + olapTable, selectedIndexId, groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalDeferMaterializeResultSink(this, context); + } + + @Override + public List getExpressions() { + return physicalResultSink.getExpressions(); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.size() == 1, + "PhysicalDeferMaterializeResultSink's children size must be 1, but real is %s", children.size()); + return new PhysicalDeferMaterializeResultSink<>( + physicalResultSink.withChildren(ImmutableList.of(children.get(0))), + olapTable, selectedIndexId, groupExpression, logicalProperties.get(), + physicalProperties, statistics, children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public List computeOutput() { + return physicalResultSink.getOutputExprs().stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public PhysicalDeferMaterializeResultSink resetLogicalProperties() { + return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId, + groupExpression, null, physicalProperties, statistics, child()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalDeferMaterializeResultSink that = (PhysicalDeferMaterializeResultSink) o; + return selectedIndexId == that.selectedIndexId && Objects.equals(physicalResultSink, + that.physicalResultSink) && Objects.equals(olapTable, that.olapTable); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), physicalResultSink, olapTable, selectedIndexId); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalDeferMaterializeResultSink[" + id.asInt() + "]", + "physicalResultSink", physicalResultSink, + "olapTable", olapTable, + "selectedIndexId", selectedIndexId + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java new file mode 100644 index 00000000000000..2c2a53761a402b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.TopN; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * use for defer materialize top n + */ +public class PhysicalDeferMaterializeTopN + extends AbstractPhysicalSort implements TopN { + + private final PhysicalTopN physicalTopN; + + /////////////////////////////////////////////////////////////////////////// + // Members for defer materialize for top-n opt. + /////////////////////////////////////////////////////////////////////////// + private final Set deferMaterializeSlotIds; + private final SlotReference columnIdSlot; + + public PhysicalDeferMaterializeTopN(PhysicalTopN physicalTopN, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(physicalTopN, deferMaterializeSlotIds, columnIdSlot, + groupExpression, logicalProperties, null, null, child); + } + + public PhysicalDeferMaterializeTopN(PhysicalTopN physicalTopN, + Set deferMaterializeSlotIds, SlotReference columnIdSlot, + Optional groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(physicalTopN.getType(), physicalTopN.getOrderKeys(), physicalTopN.getSortPhase(), + groupExpression, logicalProperties, physicalProperties, statistics, child); + this.physicalTopN = physicalTopN; + this.deferMaterializeSlotIds = deferMaterializeSlotIds; + this.columnIdSlot = columnIdSlot; + } + + public PhysicalTopN getPhysicalTopN() { + return physicalTopN; + } + + public Set getDeferMaterializeSlotIds() { + return deferMaterializeSlotIds; + } + + public SlotReference getColumnIdSlot() { + return columnIdSlot; + } + + @Override + public long getOffset() { + return physicalTopN.getOffset(); + } + + @Override + public long getLimit() { + return physicalTopN.getLimit(); + } + + public Plan withPhysicalTopN(PhysicalTopN physicalTopN) { + return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, + getLogicalProperties(), physicalProperties, statistics, physicalTopN.child()); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); + return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))), + deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalDeferMaterializeTopN(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + Preconditions.checkArgument(children.size() == 1, + "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); + return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))), + deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties.get(), + physicalProperties, statistics, children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public List computeOutput() { + return child().getOutput(); + } + + @Override + public PhysicalDeferMaterializeTopN resetLogicalProperties() { + return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, + groupExpression, null, physicalProperties, statistics, child()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalDeferMaterializeTopN that = (PhysicalDeferMaterializeTopN) o; + return Objects.equals(physicalTopN, that.physicalTopN) && Objects.equals( + deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, + that.columnIdSlot); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), physicalTopN, deferMaterializeSlotIds, columnIdSlot); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalDeferMaterializeTopN[" + id.asInt() + "]", + "physicalTopN", physicalTopN, + "deferMaterializeSlotIds", deferMaterializeSlotIds, + "columnIdSlot", columnIdSlot + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java index 8d33bc367cc26f..56ff9aff08e9a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java @@ -53,10 +53,7 @@ public PhysicalFileSink(String filePath, String format, Map prop public PhysicalFileSink(String filePath, String format, Map properties, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { - super(PlanType.PHYSICAL_FILE_SINK, groupExpression, logicalProperties, child); - this.filePath = filePath; - this.format = format; - this.properties = properties; + this(filePath, format, properties, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } public PhysicalFileSink(String filePath, String format, Map properties, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java index b98d30f4e04f56..57292551a916b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java @@ -45,8 +45,6 @@ */ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapScan { - public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots"; - private final DistributionSpec distributionSpec; private final long selectedIndexId; private final ImmutableList selectedTabletIds; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 14ae05a61e44a8..7327c7b6af8d06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -57,33 +57,14 @@ public class PhysicalOlapTableSink extends PhysicalSink private final boolean singleReplicaLoad; private final boolean isPartialUpdate; - public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, - List cols, boolean singleReplicaLoad, boolean isPartialUpdate, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate, - Optional.empty(), logicalProperties, child); - } - - public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, - List cols, boolean singleReplicaLoad, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(database, targetTable, partitionIds, cols, singleReplicaLoad, false, - Optional.empty(), logicalProperties, child); - } - /** * Constructor */ - public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, - List cols, boolean singleReplicaLoad, boolean isPartialUpdate, - Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { - super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); - this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); - this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); - this.cols = Utils.copyRequiredList(cols); - this.partitionIds = Utils.copyRequiredList(partitionIds); - this.singleReplicaLoad = singleReplicaLoad; - this.isPartialUpdate = isPartialUpdate; + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, List cols, + boolean singleReplicaLoad, boolean isPartialUpdate, Optional groupExpression, + LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate, + groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java index cb1750d8c15602..7dacd28db662e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java @@ -24,7 +24,6 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.algebra.Relation; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; @@ -76,11 +75,6 @@ public int hashCode() { return Objects.hash(relationId); } - @Override - public R accept(PlanVisitor visitor, C context) { - return visitor.visitPhysicalRelation(this, context); - } - @Override public List getExpressions() { return ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java index f3cbeae9c88ab8..ea2cd16de4ce12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java @@ -43,16 +43,9 @@ public class PhysicalResultSink extends PhysicalSink outputExprs; - public PhysicalResultSink(List outputExprs, LogicalProperties logicalProperties, - CHILD_TYPE child) { - super(PlanType.PHYSICAL_RESULT_SINK, logicalProperties, child); - this.outputExprs = outputExprs; - } - public PhysicalResultSink(List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { - super(PlanType.PHYSICAL_RESULT_SINK, groupExpression, logicalProperties, child); - this.outputExprs = outputExprs; + this(outputExprs, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } public PhysicalResultSink(List outputExprs, Optional groupExpression, @@ -62,11 +55,16 @@ public PhysicalResultSink(List outputExprs, Optional getOutputExprs() { + return outputExprs; + } + @Override public PhysicalResultSink withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalResultSink's children size must be 1, but real is %s", children.size()); - return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), children.get(0)); + return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); } @Override @@ -81,13 +79,17 @@ public List getExpressions() { @Override public PhysicalResultSink withGroupExpression(Optional groupExpression) { - return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), child()); + return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), + physicalProperties, statistics, child()); } @Override public PhysicalResultSink withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new PhysicalResultSink<>(outputExprs, groupExpression, logicalProperties.get(), child()); + Preconditions.checkArgument(children.size() == 1, + "PhysicalResultSink's children size must be 1, but real is %s", children.size()); + return new PhysicalResultSink<>(outputExprs, groupExpression, logicalProperties.get(), + physicalProperties, statistics, children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java index be6837b0a5bce9..d9dd45d208ee77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java @@ -30,18 +30,6 @@ /** abstract physical sink */ public abstract class PhysicalSink extends PhysicalUnary { - - public PhysicalSink(PlanType type, - LogicalProperties logicalProperties, CHILD_TYPE child) { - super(type, logicalProperties, child); - } - - public PhysicalSink(PlanType type, - Optional groupExpression, - LogicalProperties logicalProperties, CHILD_TYPE child) { - super(type, groupExpression, logicalProperties, child); - } - public PhysicalSink(PlanType type, Optional groupExpression, LogicalProperties logicalProperties, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 0f1d0069b34d4d..7df18fd0109b96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -109,7 +109,8 @@ public R accept(PlanVisitor visitor, C context) { @Override public PhysicalTopN withChildren(List children) { - Preconditions.checkArgument(children.size() == 1); + Preconditions.checkArgument(children.size() == 1, + "PhysicalTopN's children size must be 1, but real is %s", children.size()); return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -122,7 +123,8 @@ public PhysicalTopN withGroupExpression(Optional gr @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - Preconditions.checkArgument(children.size() == 1); + Preconditions.checkArgument(children.size() == 1, + "PhysicalTopN's children size must be 1, but real is %s", children.size()); return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, logicalProperties.get(), children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index c1077f7a7e7c12..0382948185b4d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate; @@ -53,6 +54,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; @@ -225,6 +227,10 @@ public R visitLogicalTopN(LogicalTopN topN, C context) { return visit(topN, context); } + public R visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN topN, C context) { + return visit(topN, context); + } + public R visitLogicalWindow(LogicalWindow window, C context) { return visit(window, context); } @@ -323,6 +329,10 @@ public R visitPhysicalTopN(PhysicalTopN topN, C context) { return visitAbstractPhysicalSort(topN, context); } + public R visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, C context) { + return visitAbstractPhysicalSort(topN, context); + } + public R visitPhysicalWindow(PhysicalWindow window, C context) { return visit(window, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java index af65f43d9d4b53..65a03505021629 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; @@ -29,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan; import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; @@ -92,6 +94,11 @@ default R visitLogicalOlapScan(LogicalOlapScan olapScan, C context) { return visitLogicalRelation(olapScan, context); } + default R visitLogicalDeferMaterializeOlapScan( + LogicalDeferMaterializeOlapScan deferMaterializeOlapScan, C context) { + return visitLogicalRelation(deferMaterializeOlapScan, context); + } + default R visitLogicalOneRowRelation(LogicalOneRowRelation oneRowRelation, C context) { return visitLogicalRelation(oneRowRelation, context); } @@ -128,6 +135,11 @@ default R visitPhysicalOlapScan(PhysicalOlapScan olapScan, C context) { return visitPhysicalRelation(olapScan, context); } + default R visitPhysicalDeferMaterializeOlapScan( + PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, C context) { + return visitPhysicalRelation(deferMaterializeOlapScan, context); + } + default R visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, C context) { return visitPhysicalRelation(oneRowRelation, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index 63f371aecba4b4..df790fddd29287 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -20,10 +20,12 @@ import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; @@ -70,6 +72,11 @@ default R visitLogicalResultSink(LogicalResultSink logicalResult return visitLogicalSink(logicalResultSink, context); } + default R visitLogicalDeferMaterializeResultSink( + LogicalDeferMaterializeResultSink logicalDeferMaterializeResultSink, C context) { + return visitLogicalSink(logicalDeferMaterializeResultSink, context); + } + // ******************************* // physical // ******************************* @@ -85,4 +92,9 @@ default R visitPhysicalOlapTableSink(PhysicalOlapTableSink olapT default R visitPhysicalResultSink(PhysicalResultSink physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } + + default R visitPhysicalDeferMaterializeResultSink( + PhysicalDeferMaterializeResultSink sink, C context) { + return visitPhysicalSink(sink, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index f040af004246cc..bde891a8351b61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -147,6 +147,10 @@ protected void computeStats(Analyzer analyzer) throws UserException { } } + public SortInfo getMergeInfo() { + return mergeInfo; + } + /** * Set the parameters used to merge sorted input streams. This can be called * after init(). diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 7b3e95c3a0b544..7fdf7eda2d2c0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1281,33 +1281,12 @@ public boolean getShouldColoScan() { return shouldColoScan; } - public void getColumnDesc(List columnsDesc, List keyColumnNames, - List keyColumnTypes) { - if (selectedIndexId != -1) { - for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) { - TColumn tColumn = col.toThrift(); - col.setIndexFlag(tColumn, olapTable); - if (columnsDesc != null) { - columnsDesc.add(tColumn); - } - if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) { - if (keyColumnNames != null) { - keyColumnNames.add(col.getName()); - } - if (keyColumnTypes != null) { - keyColumnTypes.add(col.getDataType().toThrift()); - } - } - } - } - } - @Override protected void toThrift(TPlanNode msg) { List keyColumnNames = new ArrayList(); List keyColumnTypes = new ArrayList(); List columnsDesc = new ArrayList(); - getColumnDesc(columnsDesc, keyColumnNames, keyColumnTypes); + olapTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, keyColumnTypes); List indexDesc = Lists.newArrayList(); // Add extra row id column diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index d5a3b7e6e63b33..08af81afbb19cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -51,7 +51,6 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.statistics.query.StatsDelta; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -501,17 +500,7 @@ private void injectRowIdColumnSlot() { } for (PlanFragment fragment : fragments) { if (injected && fragment.getSink() instanceof ResultSink) { - TFetchOption fetchOption = new TFetchOption(); - fetchOption.setFetchRowStore(olapTable.storeRowColumn()); - fetchOption.setUseTwoPhaseFetch(true); - fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo()); - // TODO for row store used seperate more faster path for wide tables - if (!olapTable.storeRowColumn()) { - // Set column desc for each column - List columnsDesc = new ArrayList(); - scanNode.getColumnDesc(columnsDesc, null, null); - fetchOption.setColumnDesc(columnsDesc); - } + TFetchOption fetchOption = olapTable.generateTwoPhaseReadOption(scanNode.getSelectedIndexId()); ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); break; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java index 18245c14424252..30f3a1ff693b12 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -19,6 +19,8 @@ import org.apache.doris.nereids.datasets.ssb.SSBTestBase; import org.apache.doris.nereids.processor.post.PlanPostProcessors; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.util.PlanChecker; @@ -40,9 +42,11 @@ public void testUseTopNRf() { .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); - Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); - PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); + PhysicalDeferMaterializeTopN localTopN + = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); + Assertions.assertTrue(localTopN.getPhysicalTopN() + .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } // topn rf do not apply on string-like and float column @@ -53,9 +57,11 @@ public void testNotUseTopNRf() { .rewrite() .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); - plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); - Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); - PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + new PlanPostProcessors(checker.getCascadesContext()).process(plan); + Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); + PhysicalDeferMaterializeTopN localTopN + = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); + Assertions.assertFalse(localTopN.getPhysicalTopN() + .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } }