From 76ce0eff1c7ccda17170ad59657a158343a91675 Mon Sep 17 00:00:00 2001 From: zhangbutao Date: Wed, 15 May 2024 21:06:51 +0800 Subject: [PATCH] [Opt](Iceberg) handle count pushdown in fe side --- .../apache/doris/datasource/FileScanNode.java | 1 + .../iceberg/source/IcebergScanNode.java | 6 ++++++ .../apache/doris/nereids/NereidsPlanner.java | 15 +++++++++++++++ .../apache/doris/planner/OriginalPlanner.java | 19 ++++++++++++++++--- 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index fba97bc595955d..44f07718485c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -71,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode { protected long totalPartitionNum = 0; protected long readPartitionNum = 0; protected long fileSplitSize; + public long rowCount = 0; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index ab8b889fdc59d6..aee9f0e0531f02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -212,6 +212,11 @@ private List doGetSplits() throws UserException { HashSet partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); + long rowCount = getCountFromSnapshot(); + if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount > 0) { + this.rowCount = rowCount; + return new ArrayList<>(); + } CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); try (CloseableIterable combinedScanTasks = TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { @@ -264,6 +269,7 @@ private List doGetSplits() throws UserException { throw new UserException(e.getMessage(), e.getCause()); } + // TODO: Need to delete this as we can handle count pushdown in fe side TPushAggOp aggOp = getPushDownAggNoGroupingOp(); if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { // we can create a special empty split and skip the plan process diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index ef5f068012af89..ac7a5ffbc586cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -27,6 +27,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.nereids.CascadesContext.Lock; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -54,6 +55,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; @@ -594,6 +596,19 @@ public Optional handleQueryInFe(StatementBase parsedStmt) { ); } return Optional.of(resultSet); + } else if (child instanceof PhysicalHashAggregate && getScanNodes().size() > 0 + && getScanNodes().get(0) instanceof IcebergScanNode) { + List columns = Lists.newArrayList(); + NamedExpression output = physicalPlan.getOutput().get(0); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) { + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList( + Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount)))); + // only support one iceberg scan node and one count, e.g. select count(*) from icetbl; + return Optional.of(resultSet); + } + return Optional.empty(); } else { return Optional.empty(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 06644a7ab9d4c2..971faee67fa1cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.nereids.PlannerHook; import org.apache.doris.qe.CommonResultSet; import org.apache.doris.qe.ConnectContext; @@ -636,13 +637,25 @@ public Optional handleQueryInFe(StatementBase parsedStmt) { return Optional.empty(); } SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt; - if (!parsedSelectStmt.getTableRefs().isEmpty()) { - return Optional.empty(); - } List selectItems = parsedSelectStmt.getSelectList().getItems(); List columns = new ArrayList<>(selectItems.size()); List columnLabels = parsedSelectStmt.getColLabels(); List data = new ArrayList<>(); + if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0) + instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) { + SelectListItem item = selectItems.get(0); + Expr expr = item.getExpr(); + String columnName = columnLabels.get(0); + columns.add(new Column(columnName, expr.getType())); + data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount)); + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); + // only support one iceberg scan node and one count, e.g. select count(*) from icetbl; + return Optional.of(resultSet); + } + if (!parsedSelectStmt.getTableRefs().isEmpty()) { + return Optional.empty(); + } for (int i = 0; i < selectItems.size(); i++) { SelectListItem item = selectItems.get(i); Expr expr = item.getExpr();