diff --git a/fe/src/main/java/org/apache/doris/analysis/InPredicate.java b/fe/src/main/java/org/apache/doris/analysis/InPredicate.java index 6765ff8e9ebf44..1963aa75c947cd 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InPredicate.java +++ b/fe/src/main/java/org/apache/doris/analysis/InPredicate.java @@ -29,9 +29,9 @@ import org.apache.doris.thrift.TExprOpcode; import org.apache.doris.thrift.TInPredicate; -import com.google.common.collect.Lists; import com.google.common.base.Preconditions; -import org.apache.logging.log4j.LogManager; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -97,6 +97,11 @@ protected InPredicate(InPredicate other) { isNotIn = other.isNotIn(); } + public int getInElementNum() { + // the first child is compare expr + return getChildren().size() - 1; + } + @Override public Expr clone() { return new InPredicate(this); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 39858e2a3eecf0..7cacc810629a4a 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -857,5 +857,16 @@ public class Config extends ConfigBase { * exception will be thrown to user client directly without load label. */ @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false; + + /* + * This will limit the max recursion depth of hash distribution pruner. + * eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) and d in (2 elements). + * a/b/c/d are distribution columns, so the recursion depth will be 5 * 4 * 3 * 2 = 120, larger than 100, + * So that distribution pruner will no work and just return all buckets. + * + * Increase the depth can support distribution pruning for more elements, but may cost more CPU. + */ + @ConfField(mutable = true, masterOnly = false) + public static int max_distribution_pruner_recursion_depth = 100; } diff --git a/fe/src/main/java/org/apache/doris/planner/HashDistributionPruner.java b/fe/src/main/java/org/apache/doris/planner/HashDistributionPruner.java index 5746cec1b88f83..8024675afe078c 100644 --- a/fe/src/main/java/org/apache/doris/planner/HashDistributionPruner.java +++ b/fe/src/main/java/org/apache/doris/planner/HashDistributionPruner.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.common.Config; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -34,43 +35,55 @@ import java.util.Map; import java.util.Set; +/* + * Prune the distribution by distribution columns' predicate, recursively. + * It only supports binary equal predicate and in predicate with AND combination. + * For example: + * where a = 1 and b in (2,3,4) and c in (5,6,7) + * a/b/c are distribution columns + * + * the config 'max_distribution_pruner_recursion_depth' will limit the max recursion depth of pruning. + * the recursion depth is calculated by the product of element number of all predicates. + * The above example's depth is 9(= 1 * 3 * 3) + * + * If depth is larger than 'max_distribution_pruner_recursion_depth', all buckets will be return without pruning. + */ public class HashDistributionPruner implements DistributionPruner { private static final Logger LOG = LogManager.getLogger(HashDistributionPruner.class); // partition list, sort by the hash code - private List partitionList; + private List bucketsList; // partition columns - private List partitionColumns; + private List distributionColumns; // partition column filters - private Map partitionColumnFilters; + private Map distributionColumnFilters; private int hashMod; - HashDistributionPruner(List partitions, List columns, + HashDistributionPruner(List bucketsList, List columns, Map filters, int hashMod) { - this.partitionList = partitions; - this.partitionColumns = columns; - this.partitionColumnFilters = filters; + this.bucketsList = bucketsList; + this.distributionColumns = columns; + this.distributionColumnFilters = filters; this.hashMod = hashMod; } // columnId: which column to compute // hashKey: the key which to compute hash value public Collection prune(int columnId, PartitionKey hashKey, int complex) { - if (columnId == partitionColumns.size()) { + if (columnId == distributionColumns.size()) { // compute Hash Key long hashValue = hashKey.getHashValue(); - return Lists.newArrayList( - partitionList.get((int) ((hashValue & 0xffffffff) % hashMod))); + return Lists.newArrayList(bucketsList.get((int) ((hashValue & 0xffffffff) % hashMod))); } - Column keyColumn = partitionColumns.get(columnId); - PartitionColumnFilter filter = partitionColumnFilters.get(keyColumn.getName()); + Column keyColumn = distributionColumns.get(columnId); + PartitionColumnFilter filter = distributionColumnFilters.get(keyColumn.getName()); if (null == filter) { // no filter in this column, no partition Key // return all subPartition - return Lists.newArrayList(partitionList); + return Lists.newArrayList(bucketsList); } InPredicate inPredicate = filter.getInPredicate(); - if (null == inPredicate || inPredicate.getChildren().size() * complex > 100) { + if (null == inPredicate || inPredicate.getInElementNum() * complex > Config.max_distribution_pruner_recursion_depth) { // equal one value if (filter.lowerBoundInclusive && filter.upperBoundInclusive && filter.lowerBound != null && filter.upperBound != null @@ -81,18 +94,19 @@ public Collection prune(int columnId, PartitionKey hashKey, int complex) { return result; } // return all SubPartition - return Lists.newArrayList(partitionList); + return Lists.newArrayList(bucketsList); } if (null != inPredicate) { - if (! (inPredicate.getChild(0) instanceof SlotRef)) { + if (!(inPredicate.getChild(0) instanceof SlotRef)) { // return all SubPartition - return Lists.newArrayList(partitionList); + return Lists.newArrayList(bucketsList); } } Set resultSet = Sets.newHashSet(); + int inElementNum = inPredicate.getInElementNum(); + int newComplex = inElementNum * complex; int childrenNum = inPredicate.getChildren().size(); - int newComplex = inPredicate.getChildren().size() * complex; for (int i = 1; i < childrenNum; ++i) { LiteralExpr expr = (LiteralExpr) inPredicate.getChild(i); hashKey.pushColumn(expr, keyColumn.getDataType()); @@ -101,7 +115,7 @@ public Collection prune(int columnId, PartitionKey hashKey, int complex) { resultSet.add(subPartitionId); } hashKey.popColumn(); - if (resultSet.size() >= partitionList.size()) { + if (resultSet.size() >= bucketsList.size()) { break; } } diff --git a/fe/src/test/java/org/apache/doris/planner/HashDistributionPrunerTest.java b/fe/src/test/java/org/apache/doris/planner/HashDistributionPrunerTest.java new file mode 100644 index 00000000000000..4222b8e051d346 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/planner/HashDistributionPrunerTest.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.kudu.client.shaded.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/* + * Author: Chenmingyu + * Date: Aug 27, 2019 + */ + +public class HashDistributionPrunerTest { + + @Test + public void test() { + List tabletIds = Lists.newArrayListWithExpectedSize(300); + for (long i = 0; i < 300; i++) { + tabletIds.add(i); + } + + // distribution columns + Column dealDate = new Column("dealDate", PrimitiveType.DATE, false); + Column mainBrandId = new Column("main_brand_id", PrimitiveType.CHAR, false); + Column itemThirdCateId = new Column("item_third_cate_id", PrimitiveType.CHAR, false); + Column channel = new Column("channel", PrimitiveType.CHAR, false); + Column shopType = new Column("shop_type", PrimitiveType.CHAR, false); + List columns = Lists.newArrayList(dealDate, mainBrandId, itemThirdCateId, channel, shopType); + + // filters + PartitionColumnFilter dealDatefilter = new PartitionColumnFilter(); + dealDatefilter.setLowerBound(new StringLiteral("2019-08-22"), true); + dealDatefilter.setUpperBound(new StringLiteral("2019-08-22"), true); + + PartitionColumnFilter mainBrandFilter = new PartitionColumnFilter(); + List inList = Lists.newArrayList(); + inList.add(new StringLiteral("1323")); + inList.add(new StringLiteral("2528")); + inList.add(new StringLiteral("9610")); + inList.add(new StringLiteral("3893")); + inList.add(new StringLiteral("6121")); + mainBrandFilter.setInPredicate(new InPredicate(new SlotRef(null, "main_brand_id"), inList, false)); + + PartitionColumnFilter itemThirdFilter = new PartitionColumnFilter(); + List inList2 = Lists.newArrayList(); + inList2.add(new StringLiteral("9719")); + inList2.add(new StringLiteral("11163")); + itemThirdFilter.setInPredicate(new InPredicate(new SlotRef(null, "item_third_cate_id"), inList2, false)); + + PartitionColumnFilter channelFilter = new PartitionColumnFilter(); + List inList3 = Lists.newArrayList(); + inList3.add(new StringLiteral("1")); + inList3.add(new StringLiteral("3")); + channelFilter.setInPredicate(new InPredicate(new SlotRef(null, "channel"), inList3, false)); + + PartitionColumnFilter shopTypeFilter = new PartitionColumnFilter(); + List inList4 = Lists.newArrayList(); + inList4.add(new StringLiteral("2")); + shopTypeFilter.setInPredicate(new InPredicate(new SlotRef(null, "shop_type"), inList4, false)); + + Map filters = Maps.newHashMap(); + filters.put("dealDate", dealDatefilter); + filters.put("main_brand_id", mainBrandFilter); + filters.put("item_third_cate_id", itemThirdFilter); + filters.put("channel", channelFilter); + filters.put("shop_type", shopTypeFilter); + + HashDistributionPruner pruner = new HashDistributionPruner(tabletIds, columns, filters, tabletIds.size()); + + Collection results = pruner.prune(); + // 20 = 1 * 5 * 2 * 2 * 1 (element num of each filter) + Assert.assertEquals(20, results.size()); + + filters.get("shop_type").getInPredicate().addChild(new StringLiteral("4")); + results = pruner.prune(); + // 40 = 1 * 5 * 2 * 2 * 2 (element num of each filter) + // 39 is because these is hash conflict + Assert.assertEquals(39, results.size()); + + filters.get("shop_type").getInPredicate().addChild(new StringLiteral("5")); + filters.get("shop_type").getInPredicate().addChild(new StringLiteral("6")); + filters.get("shop_type").getInPredicate().addChild(new StringLiteral("7")); + filters.get("shop_type").getInPredicate().addChild(new StringLiteral("8")); + results = pruner.prune(); + // 120 = 1 * 5 * 2 * 2 * 6 (element num of each filter) > 100 + Assert.assertEquals(300, results.size()); + + // check hash conflict + inList4.add(new StringLiteral("4")); + PartitionKey hashKey = new PartitionKey(); + Set tablets = Sets.newHashSet(); + hashKey.pushColumn(new StringLiteral("2019-08-22"), PrimitiveType.DATE); + for (Expr inLiteral : inList) { + hashKey.pushColumn((StringLiteral) inLiteral, PrimitiveType.CHAR); + for (Expr inLiteral2 : inList2) { + hashKey.pushColumn((StringLiteral) inLiteral2, PrimitiveType.CHAR); + for (Expr inLiteral3 : inList3) { + hashKey.pushColumn((StringLiteral) inLiteral3, PrimitiveType.CHAR); + for (Expr inLiteral4 : inList4) { + hashKey.pushColumn((StringLiteral) inLiteral4, PrimitiveType.CHAR); + long hashValue = hashKey.getHashValue(); + tablets.add(tabletIds.get((int) ((hashValue & 0xffffffff) % tabletIds.size()))); + hashKey.popColumn(); + } + hashKey.popColumn(); + } + hashKey.popColumn(); + } + hashKey.popColumn(); + } + + Assert.assertEquals(39, tablets.size()); + } + +}