Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o

RETURN_IF_ERROR(load_index());
if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 &&
read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), *schema));
} else {
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,9 +956,19 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
// occurring, return true here that column data needs to be read
return true;
}
// Check the following conditions:
// 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]')
// and it's not marked for projection in '_output_columns'.
// 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns',
// and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function.
// If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column.
// Then, return false.
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) < 1) {
if ((_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) < 1) ||
(_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) == 1 &&
_opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
VLOG_DEBUG << "SegmentIterator no need read data for column: "
<< _opts.tablet_schema->column_by_uid(unique_id).name();
return false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() {
}

Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (_push_down_agg_type == TPushAggOp::NONE) {
if (_push_down_agg_type == TPushAggOp::NONE ||
_push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,9 @@ public PlanFragment visitPhysicalStorageLayerAggregate(
case COUNT:
pushAggOp = TPushAggOp.COUNT;
break;
case COUNT_ON_MATCH:
pushAggOp = TPushAggOp.COUNT_ON_INDEX;
break;
case MIN_MAX:
pushAggOp = TPushAggOp.MINMAX;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public enum RuleType {
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION),
COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.Match;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
Expand All @@ -56,6 +57,7 @@
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
Expand Down Expand Up @@ -97,6 +99,28 @@ public List<Rule> buildRules() {
PatternDescriptor<LogicalAggregate<GroupPlan>> basePattern = logicalAggregate();

return ImmutableList.of(
RuleType.COUNT_ON_INDEX.build(
logicalAggregate(
logicalProject(
logicalFilter(
logicalOlapScan()
).when(filter -> containsMatchExpression(filter.getExpressions())
&& filter.getExpressions().size() == 1)
))
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().size() == 0)
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
return !funcs.isEmpty() && funcs.stream().allMatch(f -> f instanceof Count && !f.isDistinct());
})
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext);
})
),
RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build(
logicalAggregate(
logicalOlapScan()
Expand Down Expand Up @@ -188,6 +212,55 @@ && couldConvertToMulti(agg))
);
}

private boolean containsMatchExpression(List<Expression> expressions) {
return expressions.stream().allMatch(expr -> expr instanceof Match);
}

private boolean enablePushDownCountOnIndex() {
ConnectContext connectContext = ConnectContext.get();
return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex();
}

/**
* sql: select count(*) from tbl where column match 'token'
* <p>
* before:
* <p>
* LogicalAggregate(groupBy=[], output=[count(*)])
* |
* LogicalFilter(column match 'token')
* |
* LogicalOlapScan(table=tbl)
* <p>
* after:
* <p>
* LogicalAggregate(groupBy=[], output=[count(*)])
* |
* LogicalFilter(column match 'token')
* |
* PhysicalStorageLayerAggregate(pushAggOp=COUNT_ON_INDEX, table=PhysicalOlapScan(table=tbl))
*
*/
private LogicalAggregate<? extends Plan> pushdownCountOnIndex(
LogicalAggregate<? extends Plan> agg,
LogicalProject<? extends Plan> project,
LogicalFilter<? extends Plan> filter,
LogicalOlapScan olapScan,
CascadesContext cascadesContext) {
PhysicalOlapScan physicalOlapScan
= (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform(olapScan, cascadesContext)
.get(0);
return agg.withChildren(ImmutableList.of(
project.withChildren(ImmutableList.of(
filter.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(
physicalOlapScan,
PushDownAggOp.COUNT_ON_MATCH)))))
));
}

/**
* sql: select count(*) from tbl
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr

/** PushAggOp */
public enum PushDownAggOp {
COUNT, MIN_MAX, MIX;
COUNT, MIN_MAX, MIX, COUNT_ON_MATCH;

/** supportedFunctions */
public static Map<Class<? extends AggregateFunction>, PushDownAggOp> supportedFunctions() {
return ImmutableMap.<Class<? extends AggregateFunction>, PushDownAggOp>builder()
.put(Count.class, PushDownAggOp.COUNT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_INVERTED_INDEX_QUERY = "enable_inverted_index_query";

public static final String ENABLE_PUSHDOWN_COUNT_ON_INDEX = "enable_count_on_index_pushdown";

public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first";
public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed";

Expand Down Expand Up @@ -958,9 +960,14 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {

// Whether enable query with inverted index.
@VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward = true, description = {
"是否启用inverted index query。", "Set wether to use inverted index query."})
"是否启用inverted index query。", "Set whether to use inverted index query."})
public boolean enableInvertedIndexQuery = true;

// Whether enable pushdown count agg to scan node when using inverted index match.
@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward = true, description = {
"是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."})
public boolean enablePushDownCountOnIndex = true;

// Whether drop table when create table as select insert data appear error.
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
public boolean dropTableIfCtasFailed = true;
Expand Down Expand Up @@ -2019,6 +2026,14 @@ public void setEnableInvertedIndexQuery(boolean enableInvertedIndexQuery) {
this.enableInvertedIndexQuery = enableInvertedIndexQuery;
}

public boolean isEnablePushDownCountOnIndex() {
return enablePushDownCountOnIndex;
}

public void setEnablePushDownCountOnIndex(boolean enablePushDownCountOnIndex) {
this.enablePushDownCountOnIndex = enablePushDownCountOnIndex;
}

public int getMaxTableCountUseCascadesJoinReorder() {
return this.maxTableCountUseCascadesJoinReorder;
}
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ enum TPushAggOp {
NONE = 0,
MINMAX = 1,
COUNT = 2,
MIX = 3
MIX = 3,
COUNT_ON_INDEX = 4
}

struct TOlapScanNode {
Expand Down