Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
public long rowCount = 0;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ private List<Split> doGetSplits() throws UserException {
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long rowCount = getCountFromSnapshot();
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount > 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
Expand Down Expand Up @@ -264,6 +269,7 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

// TODO: Need to delete this as we can handle count pushdown in fe side
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
// we can create a special empty split and skip the plan process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand Down Expand Up @@ -594,6 +596,19 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalHashAggregate && getScanNodes().size() > 0
&& getScanNodes().get(0) instanceof IcebergScanNode) {
List<Column> columns = Lists.newArrayList();
NamedExpression output = physicalPlan.getOutput().get(0);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(
Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount))));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
return Optional.empty();
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.PlannerHook;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -636,13 +637,25 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
return Optional.empty();
}
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}
List<SelectListItem> selectItems = parsedSelectStmt.getSelectList().getItems();
List<Column> columns = new ArrayList<>(selectItems.size());
List<String> columnLabels = parsedSelectStmt.getColLabels();
List<String> data = new ArrayList<>();
if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0)
instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) {
SelectListItem item = selectItems.get(0);
Expr expr = item.getExpr();
String columnName = columnLabels.get(0);
columns.add(new Column(columnName, expr.getType()));
data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount));
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}
for (int i = 0; i < selectItems.size(); i++) {
SelectListItem item = selectItems.get(i);
Expr expr = item.getExpr();
Expand Down