Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ OlapTablePartitionParam::OlapTablePartitionParam(
std::shared_ptr<OlapTableSchemaParam> schema,
const TOlapTablePartitionParam& t_param)
: _schema(schema), _t_param(t_param),
_partition_slot_desc(nullptr),
_mem_tracker(new MemTracker()),
_mem_pool(new MemPool(_mem_tracker.get())) {
}
Expand All @@ -170,11 +169,23 @@ Status OlapTablePartitionParam::init() {
ss << "partition column not found, column=" << _t_param.partition_column;
return Status::InternalError(ss.str());
}
_partition_slot_desc = it->second;
_partition_slot_descs.push_back(it->second);
} else {
DCHECK(_t_param.__isset.partition_columns);
for (auto& part_col : _t_param.partition_columns) {
auto it = slots_map.find(part_col);
if (it == std::end(slots_map)) {
std::stringstream ss;
ss << "partition column not found, column=" << part_col;
return Status::InternalError(ss.str());
}
_partition_slot_descs.push_back(it->second);
}
}

_partitions_map.reset(
new std::map<Tuple*, OlapTablePartition*, OlapTablePartKeyComparator>(
OlapTablePartKeyComparator(_partition_slot_desc)));
OlapTablePartKeyComparator(_partition_slot_descs)));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
auto it = slots_map.find(col);
Expand All @@ -191,12 +202,22 @@ Status OlapTablePartitionParam::init() {
const TOlapTablePartition& t_part = _t_param.partitions[i];
OlapTablePartition* part = _obj_pool.add(new OlapTablePartition());
part->id = t_part.id;

if (t_part.__isset.start_key) {
RETURN_IF_ERROR(_create_partition_key(t_part.start_key, &part->start_key));
// deprecated, use start_keys instead
std::vector<TExprNode> exprs = { t_part.start_key };
RETURN_IF_ERROR(_create_partition_keys(exprs, &part->start_key));
} else if (t_part.__isset.start_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key));
}
if (t_part.__isset.end_key) {
RETURN_IF_ERROR(_create_partition_key(t_part.end_key, &part->end_key));
// deprecated, use end_keys instead
std::vector<TExprNode> exprs = { t_part.end_key };
RETURN_IF_ERROR(_create_partition_keys(exprs, &part->end_key));
} else if (t_part.__isset.end_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key));
}

part->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
Expand Down Expand Up @@ -242,10 +263,19 @@ bool OlapTablePartitionParam::find_tablet(Tuple* tuple,
return false;
}

Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple** part_key) {
Status OlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, Tuple** part_key) {
Tuple* tuple = (Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size());
void* slot = tuple->get_slot(_partition_slot_desc->tuple_offset());
tuple->set_not_null(_partition_slot_desc->null_indicator_offset());
for (int i = 0; i < t_exprs.size(); i++) {
const TExprNode& t_expr = t_exprs[i];
RETURN_IF_ERROR(_create_partition_key(t_expr, tuple, _partition_slot_descs[i]));
}
*part_key = tuple;
return Status::OK();
}

Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc) {
void* slot = tuple->get_slot(slot_desc->tuple_offset());
tuple->set_not_null(slot_desc->null_indicator_offset());
switch (t_expr.node_type) {
case TExprNodeType::DATE_LITERAL: {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_date_str(
Expand Down Expand Up @@ -293,7 +323,6 @@ Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, T
return Status::InternalError(ss.str());
}
}
*part_key = tuple;
return Status::OK();
}

Expand Down
34 changes: 22 additions & 12 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ struct OlapTablePartition {

class OlapTablePartKeyComparator {
public:
OlapTablePartKeyComparator(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) { }
OlapTablePartKeyComparator(const std::vector<SlotDescriptor*>& slot_descs) :
_slot_descs(slot_descs) { }
// return true if lhs < rhs
// 'nullptr' is max value, but 'null' is min value
bool operator()(const Tuple* lhs, const Tuple* rhs) const {
Expand All @@ -114,16 +115,23 @@ class OlapTablePartKeyComparator {
return true;
}

bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset());
bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset());
if (lhs_null || rhs_null) { return !rhs_null; }

auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset());
auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset());
return RawValue::lt(lhs_value, rhs_value, _slot_desc->type());
for (auto slot_desc : _slot_descs) {
bool lhs_null = lhs->is_null(slot_desc->null_indicator_offset());
bool rhs_null = rhs->is_null(slot_desc->null_indicator_offset());
if (lhs_null && rhs_null) { continue; }
if (lhs_null || rhs_null) { return !rhs_null; }

auto lhs_value = lhs->get_slot(slot_desc->tuple_offset());
auto rhs_value = rhs->get_slot(slot_desc->tuple_offset());

int res = RawValue::compare(lhs_value, rhs_value, slot_desc->type());
if (res != 0) { return res < 0; }
}
// equal, return false
return false;
}
private:
SlotDescriptor* _slot_desc;
std::vector<SlotDescriptor*> _slot_descs;
};

// store an olap table's tablet information
Expand All @@ -150,7 +158,9 @@ class OlapTablePartitionParam {
}
std::string debug_string() const;
private:
Status _create_partition_key(const TExprNode& t_expr, Tuple** part_key);
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, Tuple** part_key);

Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc);

uint32_t _compute_dist_hash(Tuple* key) const;

Expand All @@ -160,15 +170,15 @@ class OlapTablePartitionParam {
// start_key is nullptr means the lower bound is boundless
return true;
}
OlapTablePartKeyComparator comparator(_partition_slot_desc);
OlapTablePartKeyComparator comparator(_partition_slot_descs);
return !comparator(key, part->start_key);
}
private:
// this partition only valid in this schema
std::shared_ptr<OlapTableSchemaParam> _schema;
TOlapTablePartitionParam _t_param;

SlotDescriptor* _partition_slot_desc;
std::vector<SlotDescriptor*> _partition_slot_descs;
std::vector<SlotDescriptor*> _distributed_slot_descs;

ObjectPool _obj_pool;
Expand Down
40 changes: 38 additions & 2 deletions docs/documentation/cn/getting-started/data-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的

1. Partition

* Partition 列只能指定一列。且必须为 KEY 列。
* Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面 **多列分区** 小结介绍
* Partition 的区间界限是左闭右开。比如如上示例,如果想在 p201702 存储所有2月份的数据,则分区值需要输入 "2017-03-01",即范围为:[2017-02-01, 2017-03-01)。
* 不论分区列是什么类型,在写分区值时,都需要加双引号。
* 分区列通常为时间列,以方便的管理新旧数据。
Expand Down Expand Up @@ -156,7 +156,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的

* 如果使用了 Partition,则 `DISTRIBUTED ...` 语句描述的是数据在**各个分区内**的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。
* 分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。
* 分桶列的选择,是在 **查询吞吐** 和 **查询延迟** 之间的一种权衡:
* 分桶列的选择,是在 **查询吞吐** 和 **查询并发** 之间的一种权衡:

1. 如果选择多个分桶列,则数据分布更均匀。但如果查询条件不包含所有分桶列的等值条件的话,一个查询会扫描所有分桶。这样查询的吞吐会增加,但是单个查询的延迟也会增加。这个方式适合大吞吐低并发的查询场景。
2. 如果仅选择一个或少数分桶列,则点查询可以仅查询一个分桶。这种方式适合高并发的点查询场景。
Expand All @@ -174,6 +174,42 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的
* 举一些例子:假设在有10台BE,每台BE一块磁盘的情况下。如果一个表总大小为 500MB,则可以考虑4-8个分片。5GB:8-16个。50GB:32个。500GB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。5TB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。

> 注:表的数据量可以通过 `show data` 命令查看,结果除以副本数,即表的数据量。

#### 多列分区

Doris 支持指定多列作为分区列,示例如下:

```
PARTITION BY RANGE(`date`, `id`)
(
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)
```

在以上示例中,我们指定 `date`(DATE 类型) 和 `id`(INT 类型) 作为分区列。以上示例最终得到的分区如下:

```
* p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
* p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
* p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE))
```

注意,最后一个分区用户缺省只指定了 `date` 列的分区值,所以 `id` 列的分区值会默认填充 `MIN_VALUE`。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:

```
* 数据 --> 分区
* 2017-01-01, 200 --> p201701_1000
* 2017-01-01, 2000 --> p201701_1000
* 2017-02-01, 100 --> p201701_1000
* 2017-02-01, 2000 --> p201702_2000
* 2017-02-15, 5000 --> p201702_2000
* 2017-03-01, 2000 --> p201703_all
* 2017-03-10, 1 --> p201703_all
* 2017-04-01, 1000 --> 无法导入
* 2017-05-01, 1000 --> 无法导入
```

### PROPERTIES

Expand Down
7 changes: 4 additions & 3 deletions docs/help/Contents/Data Definition/ddl_stmt.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@
4. partition_desc
1) Range 分区
语法:
PARTITION BY RANGE (k1)
PARTITION BY RANGE (k1, k2, ...)
(
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1")
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value2")
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...)
PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...)
...
)
说明:
Expand All @@ -152,6 +152,7 @@
TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME
3) 分区为左闭右开区间,首个分区的左边界为做最小值
4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。
5) 可以指定一列或多列作为分区列。如果分区值缺省,则会默认填充最小值。

注意:
1) 分区一般用于时间维度的数据管理
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
}

RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc;
if (rangePartitionDesc.getPartitionColNames().size() != 1) {
throw new AnalysisException("Only allow partitioned by one column");
}

rangePartitionDesc.analyze(columnDefs, properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public PartitionInfo toPartitionInfo(List<Column> schema, Map<String, Long> part
* VALUE LESS THEN (80)
*
* key range is:
* ( {MIN, MIN, MIN}, {10, 100, 1000} )
* [ {10, 100, 500}, {50, 500, ? } )
* [ {50, 500, ? }, {80, ?, ? } )
* ( {MIN, MIN, MIN}, {10, 100, 1000} )
* [ {10, 100, 1000}, {50, 500, MIN } )
* [ {50, 500, MIN }, {80, MIN, MIN } )
*/
RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(partitionColumns);
for (SingleRangePartitionDesc desc : singleRangePartitionDescs) {
Expand Down
6 changes: 6 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class PartitionInfo implements Writable {
protected Map<Long, DataProperty> idToDataProperty;
// partition id -> replication num
protected Map<Long, Short> idToReplicationNum;
// true if the partition has multi partition columns
protected boolean isMultiColumnPartition = false;

public PartitionInfo() {
// for persist
Expand Down Expand Up @@ -87,6 +89,10 @@ public static PartitionInfo read(DataInput in) throws IOException {
return partitionInfo;
}

public boolean isMultiColumnPartition() {
return isMultiColumnPartition;
}

public String toSql(OlapTable table, List<Long> partitionId) {
return "";
}
Expand Down
18 changes: 8 additions & 10 deletions fe/src/main/java/org/apache/doris/catalog/PartitionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -75,8 +76,7 @@ public static PartitionKey createPartitionKey(List<String> keys, List<Column> co
// fill the vacancy with MIN
for (; i < columns.size(); ++i) {
Type type = Type.fromPrimitiveType(columns.get(i).getDataType());
partitionKey.keys.add(
LiteralExpr.createInfinity(type, false));
partitionKey.keys.add(LiteralExpr.createInfinity(type, false));
partitionKey.types.add(columns.get(i).getDataType());
}

Expand Down Expand Up @@ -127,8 +127,8 @@ public boolean isMaxValue() {
return true;
}

@Override
// compare with other PartitionKey. used for partition prune
@Override
public int compareTo(PartitionKey other) {
int this_key_len = this.keys.size();
int other_key_len = other.keys.size();
Expand Down Expand Up @@ -184,10 +184,10 @@ public String toSql() {
value = dateLiteral.toSql();
}
}
if (keys.size() - 1 == i) {
strBuilder.append("(").append(value).append(")");
} else {
strBuilder.append("(").append(value).append("), ");
strBuilder.append(value);

if (keys.size() - 1 != i) {
strBuilder.append(", ");
}
i++;
}
Expand All @@ -198,9 +198,7 @@ public String toSql() {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("types: [");
for (PrimitiveType type : types) {
builder.append(type.toString());
}
builder.append(Joiner.on(", ").join(types));
builder.append("]; ");

builder.append("keys: [");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public RangePartitionInfo(List<Column> partitionColumns) {
super(PartitionType.RANGE);
this.partitionColumns = partitionColumns;
this.idToRange = new HashMap<Long, Range<PartitionKey>>();
this.isMultiColumnPartition = partitionColumns.size() > 1;
}

public List<Column> getPartitionColumns() {
Expand Down Expand Up @@ -367,6 +368,8 @@ public void readFields(DataInput in) throws IOException {
partitionColumns.add(column);
}

this.isMultiColumnPartition = partitionColumns.size() > 1;

counter = in.readInt();
for (int i = 0; i < counter; i++) {
long partitionId = in.readLong();
Expand Down Expand Up @@ -405,20 +408,20 @@ public String toSql(OlapTable table, List<Long> partitionId) {
// first partition
if (!range.lowerEndpoint().isMinValue()) {
sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx)
.append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql());
.append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")");
sb.append(",\n");
}
} else {
Preconditions.checkNotNull(lastRange);
if (!lastRange.upperEndpoint().equals(range.lowerEndpoint())) {
sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx)
.append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql());
.append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")");
sb.append(",\n");
}
}

sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN ");
sb.append(range.upperEndpoint().toSql());
sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN (");
sb.append(range.upperEndpoint().toSql()).append(")");

if (partitionId != null) {
partitionId.add(entry.getKey());
Expand Down
Loading