Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions fe/src/main/java/org/apache/doris/analysis/InPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.doris.thrift.TExprOpcode;
import org.apache.doris.thrift.TInPredicate;

import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import com.google.common.collect.Lists;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -97,6 +97,11 @@ protected InPredicate(InPredicate other) {
isNotIn = other.isNotIn();
}

public int getInElementNum() {
// the first child is compare expr
return getChildren().size() - 1;
}

@Override
public Expr clone() {
return new InPredicate(this);
Expand Down
11 changes: 11 additions & 0 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -857,5 +857,16 @@ public class Config extends ConfigBase {
* exception will be thrown to user client directly without load label.
*/
@ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false;

/*
* This will limit the max recursion depth of hash distribution pruner.
* eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) and d in (2 elements).
* a/b/c/d are distribution columns, so the recursion depth will be 5 * 4 * 3 * 2 = 120, larger than 100,
* So that distribution pruner will no work and just return all buckets.
*
* Increase the depth can support distribution pruning for more elements, but may cost more CPU.
*/
@ConfField(mutable = true, masterOnly = false)
public static int max_distribution_pruner_recursion_depth = 100;
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.Config;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -34,43 +35,55 @@
import java.util.Map;
import java.util.Set;

/*
* Prune the distribution by distribution columns' predicate, recursively.
* It only supports binary equal predicate and in predicate with AND combination.
* For example:
* where a = 1 and b in (2,3,4) and c in (5,6,7)
* a/b/c are distribution columns
*
* the config 'max_distribution_pruner_recursion_depth' will limit the max recursion depth of pruning.
* the recursion depth is calculated by the product of element number of all predicates.
* The above example's depth is 9(= 1 * 3 * 3)
*
* If depth is larger than 'max_distribution_pruner_recursion_depth', all buckets will be return without pruning.
*/
public class HashDistributionPruner implements DistributionPruner {
private static final Logger LOG = LogManager.getLogger(HashDistributionPruner.class);

// partition list, sort by the hash code
private List<Long> partitionList;
private List<Long> bucketsList;
// partition columns
private List<Column> partitionColumns;
private List<Column> distributionColumns;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep one space

// partition column filters
private Map<String, PartitionColumnFilter> partitionColumnFilters;
private Map<String, PartitionColumnFilter> distributionColumnFilters;
private int hashMod;

HashDistributionPruner(List<Long> partitions, List<Column> columns,
HashDistributionPruner(List<Long> bucketsList, List<Column> columns,
Map<String, PartitionColumnFilter> filters, int hashMod) {
this.partitionList = partitions;
this.partitionColumns = columns;
this.partitionColumnFilters = filters;
this.bucketsList = bucketsList;
this.distributionColumns = columns;
this.distributionColumnFilters = filters;
this.hashMod = hashMod;
}

// columnId: which column to compute
// hashKey: the key which to compute hash value
public Collection<Long> prune(int columnId, PartitionKey hashKey, int complex) {
if (columnId == partitionColumns.size()) {
if (columnId == distributionColumns.size()) {
// compute Hash Key
long hashValue = hashKey.getHashValue();
return Lists.newArrayList(
partitionList.get((int) ((hashValue & 0xffffffff) % hashMod)));
return Lists.newArrayList(bucketsList.get((int) ((hashValue & 0xffffffff) % hashMod)));
}
Column keyColumn = partitionColumns.get(columnId);
PartitionColumnFilter filter = partitionColumnFilters.get(keyColumn.getName());
Column keyColumn = distributionColumns.get(columnId);
PartitionColumnFilter filter = distributionColumnFilters.get(keyColumn.getName());
if (null == filter) {
// no filter in this column, no partition Key
// return all subPartition
return Lists.newArrayList(partitionList);
return Lists.newArrayList(bucketsList);
}
InPredicate inPredicate = filter.getInPredicate();
if (null == inPredicate || inPredicate.getChildren().size() * complex > 100) {
if (null == inPredicate || inPredicate.getInElementNum() * complex > Config.max_distribution_pruner_recursion_depth) {
// equal one value
if (filter.lowerBoundInclusive && filter.upperBoundInclusive
&& filter.lowerBound != null && filter.upperBound != null
Expand All @@ -81,18 +94,19 @@ public Collection<Long> prune(int columnId, PartitionKey hashKey, int complex) {
return result;
}
// return all SubPartition
return Lists.newArrayList(partitionList);
return Lists.newArrayList(bucketsList);
}

if (null != inPredicate) {
if (! (inPredicate.getChild(0) instanceof SlotRef)) {
if (!(inPredicate.getChild(0) instanceof SlotRef)) {
// return all SubPartition
return Lists.newArrayList(partitionList);
return Lists.newArrayList(bucketsList);
}
}
Set<Long> resultSet = Sets.newHashSet();
int inElementNum = inPredicate.getInElementNum();
int newComplex = inElementNum * complex;
int childrenNum = inPredicate.getChildren().size();
int newComplex = inPredicate.getChildren().size() * complex;
for (int i = 1; i < childrenNum; ++i) {
LiteralExpr expr = (LiteralExpr) inPredicate.getChild(i);
hashKey.pushColumn(expr, keyColumn.getDataType());
Expand All @@ -101,7 +115,7 @@ public Collection<Long> prune(int columnId, PartitionKey hashKey, int complex) {
resultSet.add(subPartitionId);
}
hashKey.popColumn();
if (resultSet.size() >= partitionList.size()) {
if (resultSet.size() >= bucketsList.size()) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PrimitiveType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.kudu.client.shaded.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/*
* Author: Chenmingyu
* Date: Aug 27, 2019
*/

public class HashDistributionPrunerTest {

@Test
public void test() {
List<Long> tabletIds = Lists.newArrayListWithExpectedSize(300);
for (long i = 0; i < 300; i++) {
tabletIds.add(i);
}

// distribution columns
Column dealDate = new Column("dealDate", PrimitiveType.DATE, false);
Column mainBrandId = new Column("main_brand_id", PrimitiveType.CHAR, false);
Column itemThirdCateId = new Column("item_third_cate_id", PrimitiveType.CHAR, false);
Column channel = new Column("channel", PrimitiveType.CHAR, false);
Column shopType = new Column("shop_type", PrimitiveType.CHAR, false);
List<Column> columns = Lists.newArrayList(dealDate, mainBrandId, itemThirdCateId, channel, shopType);

// filters
PartitionColumnFilter dealDatefilter = new PartitionColumnFilter();
dealDatefilter.setLowerBound(new StringLiteral("2019-08-22"), true);
dealDatefilter.setUpperBound(new StringLiteral("2019-08-22"), true);

PartitionColumnFilter mainBrandFilter = new PartitionColumnFilter();
List<Expr> inList = Lists.newArrayList();
inList.add(new StringLiteral("1323"));
inList.add(new StringLiteral("2528"));
inList.add(new StringLiteral("9610"));
inList.add(new StringLiteral("3893"));
inList.add(new StringLiteral("6121"));
mainBrandFilter.setInPredicate(new InPredicate(new SlotRef(null, "main_brand_id"), inList, false));

PartitionColumnFilter itemThirdFilter = new PartitionColumnFilter();
List<Expr> inList2 = Lists.newArrayList();
inList2.add(new StringLiteral("9719"));
inList2.add(new StringLiteral("11163"));
itemThirdFilter.setInPredicate(new InPredicate(new SlotRef(null, "item_third_cate_id"), inList2, false));

PartitionColumnFilter channelFilter = new PartitionColumnFilter();
List<Expr> inList3 = Lists.newArrayList();
inList3.add(new StringLiteral("1"));
inList3.add(new StringLiteral("3"));
channelFilter.setInPredicate(new InPredicate(new SlotRef(null, "channel"), inList3, false));

PartitionColumnFilter shopTypeFilter = new PartitionColumnFilter();
List<Expr> inList4 = Lists.newArrayList();
inList4.add(new StringLiteral("2"));
shopTypeFilter.setInPredicate(new InPredicate(new SlotRef(null, "shop_type"), inList4, false));

Map<String, PartitionColumnFilter> filters = Maps.newHashMap();
filters.put("dealDate", dealDatefilter);
filters.put("main_brand_id", mainBrandFilter);
filters.put("item_third_cate_id", itemThirdFilter);
filters.put("channel", channelFilter);
filters.put("shop_type", shopTypeFilter);

HashDistributionPruner pruner = new HashDistributionPruner(tabletIds, columns, filters, tabletIds.size());

Collection<Long> results = pruner.prune();
// 20 = 1 * 5 * 2 * 2 * 1 (element num of each filter)
Assert.assertEquals(20, results.size());

filters.get("shop_type").getInPredicate().addChild(new StringLiteral("4"));
results = pruner.prune();
// 40 = 1 * 5 * 2 * 2 * 2 (element num of each filter)
// 39 is because these is hash conflict
Assert.assertEquals(39, results.size());

filters.get("shop_type").getInPredicate().addChild(new StringLiteral("5"));
filters.get("shop_type").getInPredicate().addChild(new StringLiteral("6"));
filters.get("shop_type").getInPredicate().addChild(new StringLiteral("7"));
filters.get("shop_type").getInPredicate().addChild(new StringLiteral("8"));
results = pruner.prune();
// 120 = 1 * 5 * 2 * 2 * 6 (element num of each filter) > 100
Assert.assertEquals(300, results.size());

// check hash conflict
inList4.add(new StringLiteral("4"));
PartitionKey hashKey = new PartitionKey();
Set<Long> tablets = Sets.newHashSet();
hashKey.pushColumn(new StringLiteral("2019-08-22"), PrimitiveType.DATE);
for (Expr inLiteral : inList) {
hashKey.pushColumn((StringLiteral) inLiteral, PrimitiveType.CHAR);
for (Expr inLiteral2 : inList2) {
hashKey.pushColumn((StringLiteral) inLiteral2, PrimitiveType.CHAR);
for (Expr inLiteral3 : inList3) {
hashKey.pushColumn((StringLiteral) inLiteral3, PrimitiveType.CHAR);
for (Expr inLiteral4 : inList4) {
hashKey.pushColumn((StringLiteral) inLiteral4, PrimitiveType.CHAR);
long hashValue = hashKey.getHashValue();
tablets.add(tabletIds.get((int) ((hashValue & 0xffffffff) % tabletIds.size())));
hashKey.popColumn();
}
hashKey.popColumn();
}
hashKey.popColumn();
}
hashKey.popColumn();
}

Assert.assertEquals(39, tablets.size());
}

}