Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,9 @@ The default is empty, that is, not set.

When creating a dynamic partition table, the maximum number of partitions allowed to be automatically created. To prevent creating too many partitions at once.
The default is 500.

### `enable_olap_scan_node_conjuncts_prunner`

Setting this parameter to true will enable the OlapScanNodeConjunctsPrunner. If the Conjuncts of the OlapScanNode contain InPredicate on the distrbute key, the InPredicate will be pruned based on the Tablets that the OlapScanNode to scan.

Default is true.
6 changes: 6 additions & 0 deletions docs/zh-CN/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,9 @@ thrift_client_timeout_ms 的值被设置为大于0来避免线程卡在java.net.

在创建动态分区表时,允许自动创建的最大分区数量。以防止一次性创建过多的分区。
默认为 500。

### `enable_olap_scan_node_conjuncts_prunner`

将此参数设置为 true,将会启用 OlapScanNodeConjunctsPrunner。如果 OlapScanNode 的 Conjuncts 中包含分桶键上的 InPredicate,将会根据 OlapScanNode 要扫描的 Tablets 信息对 InPredicate 进行裁剪。

默认为 true。
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1396,4 +1396,12 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_dynamic_partition_num = 500;

/**
* Whether to enable OlapScanNodeConjunctsPrunner. If the Conjuncts of the OlapScanNode contain
* InPredicate on the distrbute key, the InPredicate will be pruned based on the Tablets that
* the OlapScanNode to scan.
*/
@ConfField(mutable = true, masterOnly = false)
public static boolean enable_olap_scan_node_conjuncts_prunner = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;

import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -140,10 +141,13 @@ public class OlapScanNode extends ScanNode {
// a bucket seq may map to many tablets, and each tablet has a TScanRangeLocations.
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations= ArrayListMultimap.create();

private OlapScanNodeConjunctsPrunner olapScanNodeConjunctsPrunner;

// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
olapTable = (OlapTable) desc.getTable();
olapScanNodeConjunctsPrunner = new OlapScanNodeConjunctsPrunner(desc);
}

public void setIsPreAggregation(boolean isPreAggregation, String reason) {
Expand Down Expand Up @@ -556,6 +560,10 @@ private void computeTabletInfo() throws UserException {
totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
addScanRangeLocations(partition, selectedTable, tablets, localBeId);

olapScanNodeConjunctsPrunner.addSelectedTablets(partition.getId(),
partition.getDistributionInfo(), allTabletIds,
new ArrayList<>(tabletIds != null ? tabletIds : allTabletIds));
}
}

Expand Down Expand Up @@ -805,4 +813,23 @@ public DataPartition constructInputPartitionByDistributionInfo() {
}
return DataPartition.hashPartitioned(dataDistributeExprs);
}

@Override
protected List<Expr> pruneConjuncts(List<Expr> conjuncts, List<TScanRangeParams> scanRangeParams) {
if (!Config.enable_olap_scan_node_conjuncts_prunner) {
return conjuncts;
}

if (scanRangeParams == null) {
return conjuncts;
}

boolean allIsPaloScanRange = scanRangeParams.stream()
.allMatch(scanRangeParam -> scanRangeParam.getScanRange().isSetPaloScanRange());
Preconditions.checkState(allIsPaloScanRange, "Invalidate scanrange for OlapScannNode.");
Set<Long> tabletIds = scanRangeParams.stream()
.map(s -> s.getScanRange().palo_scan_range.getTabletId())
.collect(Collectors.toSet());
return olapScanNodeConjunctsPrunner.pruneConjuncts(conjuncts, tabletIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.Config;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/*
* Prune conjuncts according to the tablets to be scanned by OlapScanNode. It only support prune InPredicate.
*
* For example:
* table_a's distribution column is k1, bucket num is 2. tablet_0 contains data with k1 in (0, 2),
* tablet_1 contains data with k1 in (1, 3).
* when executing the following query: 'select * from table_a where k1 in (0, 1, 2, 3)'.
* Suppose there are two OlapScanNode in the execution plan, scan_node0 and scan_node1 are responsible for scanning
* tablet_0 and tablet_1 respectively. Then each sacn_node's conjuncts are 'k1 in (0, 1, 2, 3)'.
* However, considering the situation of the tablets scanned by OlapScanNode, it is actually possible to prune the
* InPredicate of the scan_node0 to 'k1 in (0, 2)' and the InPredicate of the scan_node1 to 'k1 in (1, 3)'.
*
* By prune InPredicate, we can reduce the number of ScanKeys in OlapScanNode and improve the performance of data scanning.
*/
public class OlapScanNodeConjunctsPrunner {
private final static Logger LOG = LogManager.getLogger(OlapScanNodeConjunctsPrunner.class);

private TupleDescriptor tupleDescriptor;

private Map<Long /* tablet id */, Integer /* bucket */> tabletBucket = new HashMap<>();
private Map<Long /* partition id */, List<Long /* tablet id */>> selectedTablets = new HashMap<>();
private Map<Long /* partition id */, DistributionInfo> partitionDis = new HashMap<>();

public OlapScanNodeConjunctsPrunner(TupleDescriptor tupleDescriptor) {
this.tupleDescriptor = tupleDescriptor;
}

public void addSelectedTablets(long partitionId, DistributionInfo distributionInfo, List<Long> tabletsInOrder,
List<Long> selectedTablets) {
partitionDis.put(partitionId, distributionInfo);
for (int i = 0; i < tabletsInOrder.size(); i++) {
tabletBucket.put(tabletsInOrder.get(i), i);
}
this.selectedTablets.put(partitionId, selectedTablets);
}

public List<Expr> pruneConjuncts(List<Expr> conjuncts, Set<Long> tabletIds) {
Preconditions.checkState(selectedTablets.size() > 0);

if (!partitionDis.values().stream().allMatch(dis -> dis.getType() == DistributionInfoType.HASH)) {
return conjuncts;
}

List<List<Column>> distributionColumns = partitionDis.values().stream()
.map(dis -> (HashDistributionInfo) dis)
.map(hashDis -> hashDis.getDistributionColumns())
.collect(Collectors.toList());

// Sanity check: All partition's DistributionInfo must have the same Distribution Columns.
boolean hasDifferentDistributionColumns = false;
for (int i = 1; i < distributionColumns.size(); i++) {
if (!distributionColumns.get(i - 1).equals(distributionColumns.get(i))) {
hasDifferentDistributionColumns = true;
break;
}
}
Preconditions.checkState(!hasDifferentDistributionColumns,
"Invalidate state: Partitions have diffrent distribution columns.");

List<Column> columns = distributionColumns.get(0);
// Only deal with scenes with only one distribution column.
if (columns.size() > 1) {
return conjuncts;
}

Column distributionColumn = columns.get(0);
SlotDescriptor disSlotDesc = tupleDescriptor.getColumnSlot(distributionColumn.getName());
if (disSlotDesc == null) {
// There is no conjunct bound to distribution column.
return conjuncts;
}

Map<Long /* partition id */, Set<Integer /* bucket */>> candidateBuckets = new HashMap<>();
partitionDis.entrySet().forEach(entry -> {
selectedTablets.get(entry.getKey()).forEach(tabletId -> {
if (tabletIds.contains(tabletId)) {
candidateBuckets.computeIfAbsent(entry.getKey(), key -> new HashSet<>())
.add(tabletBucket.get(tabletId));
}
});
});

List<Expr> rets = Lists.newArrayList();
for (Expr expr : conjuncts) {
// Only InPredicate can be prunned.
if (!(expr instanceof InPredicate)) {
rets.add(expr);
continue;
}

InPredicate inPredicate = (InPredicate) expr;
// InPredicate not bound by distribution column.
if (!inPredicate.isBound(disSlotDesc.getId())) {
rets.add(expr);
continue;
}

if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
rets.add(expr);
continue;
}

if (!(inPredicate.getChild(0) instanceof SlotRef)) {
rets.add(expr);
continue;
}

PartitionKey partitionKey = new PartitionKey();
List<Boolean> contained = new ArrayList<>(inPredicate.getInElementNum());
for (int i = 0; i < inPredicate.getInElementNum(); i++) {
contained.add(false);
}

for (int i = 0; i < inPredicate.getInElementNum(); i++) {
LiteralExpr element = (LiteralExpr) inPredicate.getChild(1 + i);
partitionKey.pushColumn(element, distributionColumn.getDataType());
long hash = partitionKey.getHashValue();
for (Map.Entry<Long, Set<Integer>> candidate : candidateBuckets.entrySet()) {
DistributionInfo dis = partitionDis.get(candidate.getKey());
if (candidate.getValue().contains((int) ((hash & 0xffffffff) % dis.getBucketNum()))) {
contained.set(i, true);
break;
}
}
partitionKey.popColumn();
}

Preconditions.checkState(contained.stream().anyMatch(c -> c),
"Invalide state, InPredicate's elements is empty after prunned.");
if (contained.stream().allMatch(c -> c)) {
rets.add(expr);
} else {
InPredicate newIn = (InPredicate) inPredicate.clone();
newIn.clearChildren();
newIn.addChild(inPredicate.getChild(0));

for (int i = 0; i < inPredicate.getInElementNum(); i++) {
if (contained.get(i)) {
newIn.addChild(inPredicate.getChild(1 + i));
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("InPredicate prunned: original size: {}, new size: {}",
inPredicate.getInElementNum(), newIn.getInElementNum());
List<String> originalElements = inPredicate.getChildren().subList(1, inPredicate.getInElementNum() + 1)
.stream()
.map(in -> (LiteralExpr) in)
.map(li -> li.getStringValue())
.collect(Collectors.toList());
List<String> newElements = newIn.getChildren().subList(1, newIn.getInElementNum() + 1).stream()
.map(in -> (LiteralExpr) in)
.map(li -> li.getStringValue())
.collect(Collectors.toList());
LOG.debug("InPredicate prunned: original: {}, new: {}", originalElements, newElements);
}

rets.add(newIn);
}
}

return rets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TScanRangeParams;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand All @@ -37,6 +38,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -219,9 +221,13 @@ public int getParallelExecNum() {
}

public TPlanFragment toThrift() {
return toThrift(null);
}

public TPlanFragment toThrift(Map<Integer, List<TScanRangeParams>> scanRanges) {
TPlanFragment result = new TPlanFragment();
if (planRoot != null) {
result.setPlan(planRoot.treeToThrift());
result.setPlan(planRoot.treeToThrift(scanRanges));
}
if (outputExprs != null) {
result.setOutputExprs(Expr.treesToThrift(outputExprs));
Expand Down
18 changes: 14 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPlan;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRangeParams;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
Expand All @@ -41,6 +42,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -393,13 +395,17 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {

// Convert this plan node, including all children, to its Thrift representation.
public TPlan treeToThrift() {
return treeToThrift(null);
}

public TPlan treeToThrift(Map<Integer, List<TScanRangeParams>> scanRanges) {
TPlan result = new TPlan();
treeToThriftHelper(result);
treeToThriftHelper(result, scanRanges);
return result;
}

// Append a flattened version of this plan node, including all children, to 'container'.
private void treeToThriftHelper(TPlan container) {
private void treeToThriftHelper(TPlan container, Map<Integer, List<TScanRangeParams>> scanRanges) {
TPlanNode msg = new TPlanNode();
msg.node_id = id.asInt();
msg.num_children = children.size();
Expand All @@ -408,7 +414,7 @@ private void treeToThriftHelper(TPlan container) {
msg.addToRowTuples(tid.asInt());
msg.addToNullableTuples(nullableTupleIds.contains(tid));
}
for (Expr e : conjuncts) {
for (Expr e : pruneConjuncts(conjuncts, scanRanges == null ? null: scanRanges.get(id.asInt()))) {
msg.addToConjuncts(e.treeToThrift());
}
msg.compact_data = compactData;
Expand All @@ -420,11 +426,15 @@ private void treeToThriftHelper(TPlan container) {
} else {
msg.num_children = children.size();
for (PlanNode child: children) {
child.treeToThriftHelper(container);
child.treeToThriftHelper(container, scanRanges);
}
}
}

protected List<Expr> pruneConjuncts(List<Expr> conjuncts, List<TScanRangeParams> scanRangeParams) {
return conjuncts;
}

/**
* Computes internal state, including planner-relevant statistics.
* Call this once on the root of the plan tree before calling toThrift().
Expand Down
Loading