diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 7c4fa5745f8c54..9b95c5f9b80c74 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -800,3 +800,9 @@ The default is empty, that is, not set. When creating a dynamic partition table, the maximum number of partitions allowed to be automatically created. To prevent creating too many partitions at once. The default is 500. + +### `enable_olap_scan_node_conjuncts_prunner` + +Setting this parameter to true will enable the OlapScanNodeConjunctsPrunner. If the Conjuncts of the OlapScanNode contain InPredicate on the distrbute key, the InPredicate will be pruned based on the Tablets that the OlapScanNode to scan. + +Default is true. diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 29acd0288a345b..ec9e0f1c9c33ef 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -805,3 +805,9 @@ thrift_client_timeout_ms 的值被设置为大于0来避免线程卡在java.net. 在创建动态分区表时,允许自动创建的最大分区数量。以防止一次性创建过多的分区。 默认为 500。 + +### `enable_olap_scan_node_conjuncts_prunner` + +将此参数设置为 true,将会启用 OlapScanNodeConjunctsPrunner。如果 OlapScanNode 的 Conjuncts 中包含分桶键上的 InPredicate,将会根据 OlapScanNode 要扫描的 Tablets 信息对 InPredicate 进行裁剪。 + +默认为 true。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 52fa0db5d3b03f..89b8f9fbcad027 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1396,4 +1396,12 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int max_dynamic_partition_num = 500; + + /** + * Whether to enable OlapScanNodeConjunctsPrunner. If the Conjuncts of the OlapScanNode contain + * InPredicate on the distrbute key, the InPredicate will be pruned based on the Tablets that + * the OlapScanNode to scan. + */ + @ConfField(mutable = true, masterOnly = false) + public static boolean enable_olap_scan_node_conjuncts_prunner = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6f814653dd4bec..601be5588d9101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -63,6 +63,7 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TScanRangeParams; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; @@ -140,10 +141,13 @@ public class OlapScanNode extends ScanNode { // a bucket seq may map to many tablets, and each tablet has a TScanRangeLocations. public ArrayListMultimap bucketSeq2locations= ArrayListMultimap.create(); + private OlapScanNodeConjunctsPrunner olapScanNodeConjunctsPrunner; + // Constructs node to scan given data files of table 'tbl'. public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName); olapTable = (OlapTable) desc.getTable(); + olapScanNodeConjunctsPrunner = new OlapScanNodeConjunctsPrunner(desc); } public void setIsPreAggregation(boolean isPreAggregation, String reason) { @@ -556,6 +560,10 @@ private void computeTabletInfo() throws UserException { totalTabletsNum += selectedTable.getTablets().size(); selectedTabletsNum += tablets.size(); addScanRangeLocations(partition, selectedTable, tablets, localBeId); + + olapScanNodeConjunctsPrunner.addSelectedTablets(partition.getId(), + partition.getDistributionInfo(), allTabletIds, + new ArrayList<>(tabletIds != null ? tabletIds : allTabletIds)); } } @@ -805,4 +813,23 @@ public DataPartition constructInputPartitionByDistributionInfo() { } return DataPartition.hashPartitioned(dataDistributeExprs); } + + @Override + protected List pruneConjuncts(List conjuncts, List scanRangeParams) { + if (!Config.enable_olap_scan_node_conjuncts_prunner) { + return conjuncts; + } + + if (scanRangeParams == null) { + return conjuncts; + } + + boolean allIsPaloScanRange = scanRangeParams.stream() + .allMatch(scanRangeParam -> scanRangeParam.getScanRange().isSetPaloScanRange()); + Preconditions.checkState(allIsPaloScanRange, "Invalidate scanrange for OlapScannNode."); + Set tabletIds = scanRangeParams.stream() + .map(s -> s.getScanRange().palo_scan_range.getTabletId()) + .collect(Collectors.toSet()); + return olapScanNodeConjunctsPrunner.pruneConjuncts(conjuncts, tabletIds); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunner.java new file mode 100644 index 00000000000000..8a67eeb84f580f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunner.java @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.common.Config; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/* + * Prune conjuncts according to the tablets to be scanned by OlapScanNode. It only support prune InPredicate. + * + * For example: + * table_a's distribution column is k1, bucket num is 2. tablet_0 contains data with k1 in (0, 2), + * tablet_1 contains data with k1 in (1, 3). + * when executing the following query: 'select * from table_a where k1 in (0, 1, 2, 3)'. + * Suppose there are two OlapScanNode in the execution plan, scan_node0 and scan_node1 are responsible for scanning + * tablet_0 and tablet_1 respectively. Then each sacn_node's conjuncts are 'k1 in (0, 1, 2, 3)'. + * However, considering the situation of the tablets scanned by OlapScanNode, it is actually possible to prune the + * InPredicate of the scan_node0 to 'k1 in (0, 2)' and the InPredicate of the scan_node1 to 'k1 in (1, 3)'. + * + * By prune InPredicate, we can reduce the number of ScanKeys in OlapScanNode and improve the performance of data scanning. + */ +public class OlapScanNodeConjunctsPrunner { + private final static Logger LOG = LogManager.getLogger(OlapScanNodeConjunctsPrunner.class); + + private TupleDescriptor tupleDescriptor; + + private Map tabletBucket = new HashMap<>(); + private Map> selectedTablets = new HashMap<>(); + private Map partitionDis = new HashMap<>(); + + public OlapScanNodeConjunctsPrunner(TupleDescriptor tupleDescriptor) { + this.tupleDescriptor = tupleDescriptor; + } + + public void addSelectedTablets(long partitionId, DistributionInfo distributionInfo, List tabletsInOrder, + List selectedTablets) { + partitionDis.put(partitionId, distributionInfo); + for (int i = 0; i < tabletsInOrder.size(); i++) { + tabletBucket.put(tabletsInOrder.get(i), i); + } + this.selectedTablets.put(partitionId, selectedTablets); + } + + public List pruneConjuncts(List conjuncts, Set tabletIds) { + Preconditions.checkState(selectedTablets.size() > 0); + + if (!partitionDis.values().stream().allMatch(dis -> dis.getType() == DistributionInfoType.HASH)) { + return conjuncts; + } + + List> distributionColumns = partitionDis.values().stream() + .map(dis -> (HashDistributionInfo) dis) + .map(hashDis -> hashDis.getDistributionColumns()) + .collect(Collectors.toList()); + + // Sanity check: All partition's DistributionInfo must have the same Distribution Columns. + boolean hasDifferentDistributionColumns = false; + for (int i = 1; i < distributionColumns.size(); i++) { + if (!distributionColumns.get(i - 1).equals(distributionColumns.get(i))) { + hasDifferentDistributionColumns = true; + break; + } + } + Preconditions.checkState(!hasDifferentDistributionColumns, + "Invalidate state: Partitions have diffrent distribution columns."); + + List columns = distributionColumns.get(0); + // Only deal with scenes with only one distribution column. + if (columns.size() > 1) { + return conjuncts; + } + + Column distributionColumn = columns.get(0); + SlotDescriptor disSlotDesc = tupleDescriptor.getColumnSlot(distributionColumn.getName()); + if (disSlotDesc == null) { + // There is no conjunct bound to distribution column. + return conjuncts; + } + + Map> candidateBuckets = new HashMap<>(); + partitionDis.entrySet().forEach(entry -> { + selectedTablets.get(entry.getKey()).forEach(tabletId -> { + if (tabletIds.contains(tabletId)) { + candidateBuckets.computeIfAbsent(entry.getKey(), key -> new HashSet<>()) + .add(tabletBucket.get(tabletId)); + } + }); + }); + + List rets = Lists.newArrayList(); + for (Expr expr : conjuncts) { + // Only InPredicate can be prunned. + if (!(expr instanceof InPredicate)) { + rets.add(expr); + continue; + } + + InPredicate inPredicate = (InPredicate) expr; + // InPredicate not bound by distribution column. + if (!inPredicate.isBound(disSlotDesc.getId())) { + rets.add(expr); + continue; + } + + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + rets.add(expr); + continue; + } + + if (!(inPredicate.getChild(0) instanceof SlotRef)) { + rets.add(expr); + continue; + } + + PartitionKey partitionKey = new PartitionKey(); + List contained = new ArrayList<>(inPredicate.getInElementNum()); + for (int i = 0; i < inPredicate.getInElementNum(); i++) { + contained.add(false); + } + + for (int i = 0; i < inPredicate.getInElementNum(); i++) { + LiteralExpr element = (LiteralExpr) inPredicate.getChild(1 + i); + partitionKey.pushColumn(element, distributionColumn.getDataType()); + long hash = partitionKey.getHashValue(); + for (Map.Entry> candidate : candidateBuckets.entrySet()) { + DistributionInfo dis = partitionDis.get(candidate.getKey()); + if (candidate.getValue().contains((int) ((hash & 0xffffffff) % dis.getBucketNum()))) { + contained.set(i, true); + break; + } + } + partitionKey.popColumn(); + } + + Preconditions.checkState(contained.stream().anyMatch(c -> c), + "Invalide state, InPredicate's elements is empty after prunned."); + if (contained.stream().allMatch(c -> c)) { + rets.add(expr); + } else { + InPredicate newIn = (InPredicate) inPredicate.clone(); + newIn.clearChildren(); + newIn.addChild(inPredicate.getChild(0)); + + for (int i = 0; i < inPredicate.getInElementNum(); i++) { + if (contained.get(i)) { + newIn.addChild(inPredicate.getChild(1 + i)); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("InPredicate prunned: original size: {}, new size: {}", + inPredicate.getInElementNum(), newIn.getInElementNum()); + List originalElements = inPredicate.getChildren().subList(1, inPredicate.getInElementNum() + 1) + .stream() + .map(in -> (LiteralExpr) in) + .map(li -> li.getStringValue()) + .collect(Collectors.toList()); + List newElements = newIn.getChildren().subList(1, newIn.getInElementNum() + 1).stream() + .map(in -> (LiteralExpr) in) + .map(li -> li.getStringValue()) + .collect(Collectors.toList()); + LOG.debug("InPredicate prunned: original: {}, new: {}", originalElements, newElements); + } + + rets.add(newIn); + } + } + + return rets; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index f714132b7dd241..c04d33c109fa6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TResultSinkType; +import org.apache.doris.thrift.TScanRangeParams; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -37,6 +38,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -219,9 +221,13 @@ public int getParallelExecNum() { } public TPlanFragment toThrift() { + return toThrift(null); + } + + public TPlanFragment toThrift(Map> scanRanges) { TPlanFragment result = new TPlanFragment(); if (planRoot != null) { - result.setPlan(planRoot.treeToThrift()); + result.setPlan(planRoot.treeToThrift(scanRanges)); } if (outputExprs != null) { result.setOutputExprs(Expr.treesToThrift(outputExprs)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index dd7f6363984b5e..cf0032ae7772c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -29,6 +29,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TScanRangeParams; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -41,6 +42,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -393,13 +395,17 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { // Convert this plan node, including all children, to its Thrift representation. public TPlan treeToThrift() { + return treeToThrift(null); + } + + public TPlan treeToThrift(Map> scanRanges) { TPlan result = new TPlan(); - treeToThriftHelper(result); + treeToThriftHelper(result, scanRanges); return result; } // Append a flattened version of this plan node, including all children, to 'container'. - private void treeToThriftHelper(TPlan container) { + private void treeToThriftHelper(TPlan container, Map> scanRanges) { TPlanNode msg = new TPlanNode(); msg.node_id = id.asInt(); msg.num_children = children.size(); @@ -408,7 +414,7 @@ private void treeToThriftHelper(TPlan container) { msg.addToRowTuples(tid.asInt()); msg.addToNullableTuples(nullableTupleIds.contains(tid)); } - for (Expr e : conjuncts) { + for (Expr e : pruneConjuncts(conjuncts, scanRanges == null ? null: scanRanges.get(id.asInt()))) { msg.addToConjuncts(e.treeToThrift()); } msg.compact_data = compactData; @@ -420,11 +426,15 @@ private void treeToThriftHelper(TPlan container) { } else { msg.num_children = children.size(); for (PlanNode child: children) { - child.treeToThriftHelper(container); + child.treeToThriftHelper(container, scanRanges); } } } + protected List pruneConjuncts(List conjuncts, List scanRangeParams) { + return conjuncts; + } + /** * Computes internal state, including planner-relevant statistics. * Call this once on the root of the plan tree before calling toThrift(). diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cb93a094b20e92..c5248f77984c45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1926,7 +1926,7 @@ List toThrift(int backendNum) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); TExecPlanFragmentParams params = new TExecPlanFragmentParams(); params.setProtocolVersion(PaloInternalServiceVersion.V1); - params.setFragment(fragment.toThrift()); + params.setFragment(fragment.toThrift(instanceExecParam.perNodeScanRanges)); params.setDescTbl(descTable); params.setParams(new TPlanFragmentExecParams()); params.setResourceInfo(tResourceInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunnerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunnerTest.java new file mode 100644 index 00000000000000..4303babbe0d4b8 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeConjunctsPrunnerTest.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Type; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class OlapScanNodeConjunctsPrunnerTest { + + @Test + public void testPruneConjuncts() { + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(1)); + SlotDescriptor slot1 = new SlotDescriptor(new SlotId(1), tupleDescriptor); + slot1.setType(Type.BIGINT); + slot1.setColumn(new Column("slot1", PrimitiveType.INT)); + tupleDescriptor.addSlot(slot1); + SlotRef slotRef1 = new SlotRef(slot1); + SlotDescriptor slot2 = new SlotDescriptor(new SlotId(2), tupleDescriptor); + slot2.setType(Type.BIGINT); + slot2.setColumn(new Column("slot2", PrimitiveType.INT)); + tupleDescriptor.addSlot(slot2); + SlotRef slotRef2 = new SlotRef(slot2); + SlotDescriptor slot3 = new SlotDescriptor(new SlotId(3), tupleDescriptor); + slot3.setType(Type.BIGINT); + slot3.setColumn(new Column("slot3", PrimitiveType.INT)); + tupleDescriptor.addSlot(slot3); + SlotRef slotRef3 = new SlotRef(slot3); + + OlapScanNodeConjunctsPrunner prunner = new OlapScanNodeConjunctsPrunner(tupleDescriptor); + + List distributionColumns = Arrays.asList(new Column("slot1", PrimitiveType.INT)); + + List inElements = new ArrayList<>(); + List inExprs = new ArrayList<>(); + for (long i = 0; i < 1000; i++) { + inElements.add(i); + inExprs.add(new IntLiteral(i)); + } + + Set targetBucket = new HashSet(){{add(8); add(21);}}; + Set targetPrunnedElements = new HashSet<>(); + Set targetTablets = new HashSet<>(); + + // Build selected tablet info + buildSelecetedTabletsInfo(prunner, distributionColumns, inElements, 1, 20, + targetBucket, targetPrunnedElements, targetTablets); + buildSelecetedTabletsInfo(prunner, distributionColumns, inElements, 2, 50, + targetBucket, targetPrunnedElements, targetTablets); + + // Case: "where col_x in (...)" should prune. + List conjuncts1 = Arrays.asList(new InPredicate(slotRef1, inExprs, false)); + List rets1 = prunner.pruneConjuncts(conjuncts1, targetTablets); + Assert.assertNotEquals(conjuncts1, rets1); + InPredicate prunedIn = (InPredicate) rets1.get(0); + Assert.assertEquals(targetPrunnedElements.size(), prunedIn.getInElementNum()); + for (int i = 0; i < prunedIn.getInElementNum(); i++) { + long element = ((IntLiteral) prunedIn.getChild(1 + i)).getValue(); + Assert.assertTrue(targetPrunnedElements.contains(element)); + } + + // Case: The flowing conjuncts should not pruned. + List conjuncts2 = new ArrayList<>(); + // case 1. Not In Predicate + conjuncts2.clear(); + conjuncts2.add(new InPredicate(slotRef1, inExprs, true)); + Assert.assertEquals(conjuncts2, prunner.pruneConjuncts(conjuncts2, targetTablets)); + + // case 2. child(0) of the in predicate is a constant expression + conjuncts2.clear(); + conjuncts2.add(new InPredicate(new IntLiteral(1), inExprs, false)); + Assert.assertEquals(conjuncts2, prunner.pruneConjuncts(conjuncts2, targetTablets)); + + // case 3. InPredicate that not bound to distribution column + conjuncts2.clear(); + conjuncts2.add(new InPredicate(slotRef2, inExprs, false)); + Assert.assertEquals(conjuncts2, prunner.pruneConjuncts(conjuncts2, targetTablets)); + + // case 4. conjuncts other than InPredicate + conjuncts2.clear(); + conjuncts2.add(new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef1, new IntLiteral(1))); + Assert.assertEquals(conjuncts2, prunner.pruneConjuncts(conjuncts2, targetTablets)); + } + + private void buildSelecetedTabletsInfo(OlapScanNodeConjunctsPrunner prunner, + List distributionColumns, + List inElements, + int partitionId, + int bucketNum, + Set targetBucket, + Set targetPrunnedElements, + Set targetTablets) { + HashDistributionInfo hashDis = new HashDistributionInfo(bucketNum, distributionColumns); + List tabletsInOrder = new ArrayList<>(hashDis.getBucketNum()); + for (int i = 0; i < hashDis.getBucketNum(); i++) { + tabletsInOrder.add(partitionId * 10000 + (long) i); + } + List selectedTablets = new ArrayList<>(); + for (long inElemenet : inElements) { + PartitionKey partitionKey = new PartitionKey(); + partitionKey.pushColumn(new IntLiteral(inElemenet), PrimitiveType.INT); + int bucket = (int) ((partitionKey.getHashValue() & 0xffffffff) % hashDis.getBucketNum()); + selectedTablets.add(tabletsInOrder.get(bucket)); + if (targetBucket.contains(bucket)) { + targetPrunnedElements.add(inElemenet); + targetTablets.add(tabletsInOrder.get(bucket)); + } + } + prunner.addSelectedTablets(partitionId, hashDis, tabletsInOrder, selectedTablets); + } +}