Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ private static class ParamCreateContext {

public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
super(id, destTupleDesc, planNodeName);
super(id, destTupleDesc, planNodeName, NodeType.BROKER_SCAN_NODE);
this.fileStatusesList = fileStatusesList;
this.filesAdded = filesAdded;
}

public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded, NodeType nodeType) {
super(id, destTupleDesc, planNodeName, nodeType);
this.fileStatusesList = fileStatusesList;
this.filesAdded = filesAdded;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class EsScanNode extends ScanNode {
boolean isFinalized = false;

public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
super(id, desc, planNodeName, NodeType.ES_SCAN_NODE);
table = (EsTable) (desc.getTable());
esTablePartitions = table.getEsTablePartitions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public List<String> getPartitionKeys() {

public HiveScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded);
super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded, NodeType.HIVE_SCAN_NODE);
this.hiveTable = (HiveTable) destTupleDesc.getTable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class IcebergScanNode extends BrokerScanNode {

public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
super(id, desc, planNodeName, fileStatusesList, filesAdded);
super(id, desc, planNodeName, fileStatusesList, filesAdded, NodeType.ICEBREG_SCAN_NODE);
icebergTable = (IcebergTable) desc.getTable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public abstract class LoadScanNode extends ScanNode {
protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;

public LoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
super(id, desc, planNodeName, NodeType.LOAD_SCAN_NODE);
}

public LoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, NodeType nodeType) {
super(id, desc, planNodeName, nodeType);
}

protected void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MysqlScanNode extends ScanNode {
* Constructs node to scan given data files of table 'tbl'.
*/
public MysqlScanNode(PlanNodeId id, TupleDescriptor desc, MysqlTable tbl) {
super(id, desc, "SCAN MYSQL");
super(id, desc, "SCAN MYSQL", NodeType.MYSQL_SCAN_NODE);
tblName = "`" + tbl.getMysqlTableName() + "`";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr exp
* Constructs node to scan given data files of table 'tbl'.
*/
public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl) {
super(id, desc, "SCAN ODBC");
super(id, desc, "SCAN ODBC", NodeType.ODBC_SCAN_NODE);
connectString = tbl.getConnectString();
odbcType = tbl.getOdbcTableType();
tblName = OdbcTable.databaseProperName(odbcType, tbl.getOdbcTableName());
Expand Down
53 changes: 27 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -146,7 +147,7 @@ public class OlapScanNode extends ScanNode {

// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
super(id, desc, planNodeName, NodeType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
}

Expand Down Expand Up @@ -346,10 +347,25 @@ public void init(Analyzer analyzer) throws UserException {
* - So only an inaccurate cardinality can be calculated here.
*/
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
mockRowCountInStatistic();
computeInaccurateCardinality();
}
}

/**
* Remove the method after statistics collection is working properly
*/
public void mockRowCountInStatistic() {
long tableId = desc.getTable().getId();
cardinality = 0;
for (long selectedPartitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(selectedPartitionId);
final MaterializedIndex baseIndex = partition.getBaseIndex();
cardinality += baseIndex.getRowCount();
}
Catalog.getCurrentCatalog().getStatisticsManager().getStatistics().mockTableStatsWithRowCount(tableId, cardinality);
}

@Override
public void finalize(Analyzer analyzer) throws UserException {
LOG.debug("OlapScanNode get scan range locations. Tuple: {}", desc);
Expand Down Expand Up @@ -386,6 +402,12 @@ public void computeStats(Analyzer analyzer) {
}
// when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
cardinality = cardinality == -1 ? 0 : cardinality;

// update statsDeriveResult for real statistics
// After statistics collection is complete, remove the logic
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
statsDeriveResult.setRowCount(cardinality);
}
}

@Override
Expand All @@ -397,30 +419,9 @@ protected void computeNumNodes() {
numNodes = numNodes <= 0 ? 1 : numNodes;
}

/**
* Calculate inaccurate cardinality.
* cardinality: the value of cardinality is the sum of rowcount which belongs to selectedPartitionIds
* The cardinality here is actually inaccurate, it will be greater than the actual value.
* There are two reasons
* 1. During the actual execution, not all tablets belonging to the selected partition will be scanned.
* Some tablets may have been pruned before execution.
* 2. The base index may eventually be replaced by mv index.
* <p>
* There are three steps to calculate cardinality
* 1. Calculate how many rows were scanned
* 2. Apply conjunct
* 3. Apply limit
*/
private void computeInaccurateCardinality() {
// step1: Calculate how many rows were scanned
cardinality = 0;
for (long selectedPartitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(selectedPartitionId);
final MaterializedIndex baseIndex = partition.getBaseIndex();
cardinality += baseIndex.getRowCount();
}
applyConjunctsSelectivity();
capCardinalityAtLimit();
private void computeInaccurateCardinality() throws UserException {
StatsRecursiveDerive.getStatsRecursiveDerive().statsRecursiveDerive(this);
cardinality = statsDeriveResult.getRowCount();
}

private Collection<Long> partitionPrune(PartitionInfo partitionInfo, PartitionNames partitionNames) throws AnalysisException {
Expand Down Expand Up @@ -563,7 +564,7 @@ private void addScanRangeLocations(Partition partition,

result.add(scanRangeLocations);
}
// FIXME(dhc): we use cardinality here to simulate ndv

if (tablets.size() == 0) {
desc.setCardinality(0);
} else {
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.common.TreeNode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.statistics.StatsDeriveResult;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFunctionBinaryType;
import org.apache.doris.thrift.TPlan;
Expand Down Expand Up @@ -135,6 +136,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {

protected List<SlotId> outputSlotIds;

protected NodeType nodeType = NodeType.DEFAULT;
protected StatsDeriveResult statsDeriveResult;

protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) {
this.id = id;
this.limit = -1;
Expand Down Expand Up @@ -173,12 +177,41 @@ protected PlanNode(PlanNodeId id, PlanNode node, String planNodeName) {
this.planNodeName = VectorizedUtil.isVectorized() ?
"V" + planNodeName : planNodeName;
this.numInstances = 1;
this.nodeType = nodeType;
}

public enum NodeType {
DEFAULT,
AGG_NODE,
BROKER_SCAN_NODE,
HASH_JOIN_NODE,
HIVE_SCAN_NODE,
MERGE_NODE,
ES_SCAN_NODE,
ICEBREG_SCAN_NODE,
LOAD_SCAN_NODE,
MYSQL_SCAN_NODE,
ODBC_SCAN_NODE,
OLAP_SCAN_NODE,
SCHEMA_SCAN_NODE,
}

public String getPlanNodeName() {
return planNodeName;
}

public StatsDeriveResult getStatsDeriveResult() {
return statsDeriveResult;
}

public NodeType getNodeType() {
return nodeType;
}

public void setStatsDeriveResult(StatsDeriveResult statsDeriveResult) {
this.statsDeriveResult = statsDeriveResult;
}

/**
* Sets tblRefIds_, tupleIds_, and nullableTupleIds_.
* The default implementation is a no-op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ abstract public class ScanNode extends PlanNode {
protected String sortColumn = null;
protected Analyzer analyzer;

public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, NodeType nodeType) {
super(id, desc.getId().asList(), planNodeName);
super.nodeType = nodeType;
this.desc = desc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SchemaScanNode extends ScanNode {
* Constructs node to scan given data files of table 'tbl'.
*/
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "SCAN SCHEMA");
super(id, desc, "SCAN SCHEMA", NodeType.SCHEMA_SCAN_NODE);
this.tableName = desc.getTable().getName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,6 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
scanNodeList.add(scanNode);

scanNode.init(analyzer);

return scanNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,14 +699,6 @@ private void analyzeAndGenerateQueryPlan(TQueryOptions tQueryOptions) throws Use
}
if (explainOptions != null) parsedStmt.setIsExplain(explainOptions);
}

if (parsedStmt instanceof InsertStmt && parsedStmt.isExplain()) {
if (ConnectContext.get() != null &&
ConnectContext.get().getExecutor() != null &&
ConnectContext.get().getExecutor().getParsedStmt() != null) {
ConnectContext.get().getExecutor().getParsedStmt().setIsExplain(new ExplainOptions(true, false));
}
}
}
plannerProfile.setQueryAnalysisFinishTime();

Expand Down
Loading