diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index c4fc5b3b5cc996..2ad37491f705aa 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -150,7 +150,6 @@ OlapTablePartitionParam::OlapTablePartitionParam( std::shared_ptr schema, const TOlapTablePartitionParam& t_param) : _schema(schema), _t_param(t_param), - _partition_slot_desc(nullptr), _mem_tracker(new MemTracker()), _mem_pool(new MemPool(_mem_tracker.get())) { } @@ -170,11 +169,23 @@ Status OlapTablePartitionParam::init() { ss << "partition column not found, column=" << _t_param.partition_column; return Status::InternalError(ss.str()); } - _partition_slot_desc = it->second; + _partition_slot_descs.push_back(it->second); + } else { + DCHECK(_t_param.__isset.partition_columns); + for (auto& part_col : _t_param.partition_columns) { + auto it = slots_map.find(part_col); + if (it == std::end(slots_map)) { + std::stringstream ss; + ss << "partition column not found, column=" << part_col; + return Status::InternalError(ss.str()); + } + _partition_slot_descs.push_back(it->second); + } } + _partitions_map.reset( new std::map( - OlapTablePartKeyComparator(_partition_slot_desc))); + OlapTablePartKeyComparator(_partition_slot_descs))); if (_t_param.__isset.distributed_columns) { for (auto& col : _t_param.distributed_columns) { auto it = slots_map.find(col); @@ -191,12 +202,22 @@ Status OlapTablePartitionParam::init() { const TOlapTablePartition& t_part = _t_param.partitions[i]; OlapTablePartition* part = _obj_pool.add(new OlapTablePartition()); part->id = t_part.id; + if (t_part.__isset.start_key) { - RETURN_IF_ERROR(_create_partition_key(t_part.start_key, &part->start_key)); + // deprecated, use start_keys instead + std::vector exprs = { t_part.start_key }; + RETURN_IF_ERROR(_create_partition_keys(exprs, &part->start_key)); + } else if (t_part.__isset.start_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key)); } if (t_part.__isset.end_key) { - RETURN_IF_ERROR(_create_partition_key(t_part.end_key, &part->end_key)); + // deprecated, use end_keys instead + std::vector exprs = { t_part.end_key }; + RETURN_IF_ERROR(_create_partition_keys(exprs, &part->end_key)); + } else if (t_part.__isset.end_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key)); } + part->num_buckets = t_part.num_buckets; auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { @@ -242,10 +263,19 @@ bool OlapTablePartitionParam::find_tablet(Tuple* tuple, return false; } -Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple** part_key) { +Status OlapTablePartitionParam::_create_partition_keys(const std::vector& t_exprs, Tuple** part_key) { Tuple* tuple = (Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size()); - void* slot = tuple->get_slot(_partition_slot_desc->tuple_offset()); - tuple->set_not_null(_partition_slot_desc->null_indicator_offset()); + for (int i = 0; i < t_exprs.size(); i++) { + const TExprNode& t_expr = t_exprs[i]; + RETURN_IF_ERROR(_create_partition_key(t_expr, tuple, _partition_slot_descs[i])); + } + *part_key = tuple; + return Status::OK(); +} + +Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc) { + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + tuple->set_not_null(slot_desc->null_indicator_offset()); switch (t_expr.node_type) { case TExprNodeType::DATE_LITERAL: { if (!reinterpret_cast(slot)->from_date_str( @@ -293,7 +323,6 @@ Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, T return Status::InternalError(ss.str()); } } - *part_key = tuple; return Status::OK(); } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 6ba2b99e2ec00e..ffef971f8b9dc0 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -104,7 +104,8 @@ struct OlapTablePartition { class OlapTablePartKeyComparator { public: - OlapTablePartKeyComparator(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) { } + OlapTablePartKeyComparator(const std::vector& slot_descs) : + _slot_descs(slot_descs) { } // return true if lhs < rhs // 'nullptr' is max value, but 'null' is min value bool operator()(const Tuple* lhs, const Tuple* rhs) const { @@ -114,16 +115,23 @@ class OlapTablePartKeyComparator { return true; } - bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset()); - bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset()); - if (lhs_null || rhs_null) { return !rhs_null; } - - auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset()); - auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset()); - return RawValue::lt(lhs_value, rhs_value, _slot_desc->type()); + for (auto slot_desc : _slot_descs) { + bool lhs_null = lhs->is_null(slot_desc->null_indicator_offset()); + bool rhs_null = rhs->is_null(slot_desc->null_indicator_offset()); + if (lhs_null && rhs_null) { continue; } + if (lhs_null || rhs_null) { return !rhs_null; } + + auto lhs_value = lhs->get_slot(slot_desc->tuple_offset()); + auto rhs_value = rhs->get_slot(slot_desc->tuple_offset()); + + int res = RawValue::compare(lhs_value, rhs_value, slot_desc->type()); + if (res != 0) { return res < 0; } + } + // equal, return false + return false; } private: - SlotDescriptor* _slot_desc; + std::vector _slot_descs; }; // store an olap table's tablet information @@ -150,7 +158,9 @@ class OlapTablePartitionParam { } std::string debug_string() const; private: - Status _create_partition_key(const TExprNode& t_expr, Tuple** part_key); + Status _create_partition_keys(const std::vector& t_exprs, Tuple** part_key); + + Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc); uint32_t _compute_dist_hash(Tuple* key) const; @@ -160,7 +170,7 @@ class OlapTablePartitionParam { // start_key is nullptr means the lower bound is boundless return true; } - OlapTablePartKeyComparator comparator(_partition_slot_desc); + OlapTablePartKeyComparator comparator(_partition_slot_descs); return !comparator(key, part->start_key); } private: @@ -168,7 +178,7 @@ class OlapTablePartitionParam { std::shared_ptr _schema; TOlapTablePartitionParam _t_param; - SlotDescriptor* _partition_slot_desc; + std::vector _partition_slot_descs; std::vector _distributed_slot_descs; ObjectPool _obj_pool; diff --git a/docs/documentation/cn/getting-started/data-partition.md b/docs/documentation/cn/getting-started/data-partition.md index bb4f103cda9891..0224af16257505 100644 --- a/docs/documentation/cn/getting-started/data-partition.md +++ b/docs/documentation/cn/getting-started/data-partition.md @@ -86,7 +86,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 1. Partition - * Partition 列只能指定一列。且必须为 KEY 列。 + * Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面 **多列分区** 小结介绍。 * Partition 的区间界限是左闭右开。比如如上示例,如果想在 p201702 存储所有2月份的数据,则分区值需要输入 "2017-03-01",即范围为:[2017-02-01, 2017-03-01)。 * 不论分区列是什么类型,在写分区值时,都需要加双引号。 * 分区列通常为时间列,以方便的管理新旧数据。 @@ -156,7 +156,7 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 * 如果使用了 Partition,则 `DISTRIBUTED ...` 语句描述的是数据在**各个分区内**的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。 * 分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。 - * 分桶列的选择,是在 **查询吞吐** 和 **查询延迟** 之间的一种权衡: + * 分桶列的选择,是在 **查询吞吐** 和 **查询并发** 之间的一种权衡: 1. 如果选择多个分桶列,则数据分布更均匀。但如果查询条件不包含所有分桶列的等值条件的话,一个查询会扫描所有分桶。这样查询的吞吐会增加,但是单个查询的延迟也会增加。这个方式适合大吞吐低并发的查询场景。 2. 如果仅选择一个或少数分桶列,则点查询可以仅查询一个分桶。这种方式适合高并发的点查询场景。 @@ -174,6 +174,42 @@ Doris 支持两层的数据划分。第一层是 Partition,仅支持 Range 的 * 举一些例子:假设在有10台BE,每台BE一块磁盘的情况下。如果一个表总大小为 500MB,则可以考虑4-8个分片。5GB:8-16个。50GB:32个。500GB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。5TB:建议分区,每个分区大小在 50GB 左右,每个分区16-32个分片。 > 注:表的数据量可以通过 `show data` 命令查看,结果除以副本数,即表的数据量。 + +#### 多列分区 + +Doris 支持指定多列作为分区列,示例如下: + +``` +PARTITION BY RANGE(`date`, `id`) +( + PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"), + PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"), + PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01") +) +``` + +在以上示例中,我们指定 `date`(DATE 类型) 和 `id`(INT 类型) 作为分区列。以上示例最终得到的分区如下: + +``` +* p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") ) +* p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") ) +* p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE)) +``` + +注意,最后一个分区用户缺省只指定了 `date` 列的分区值,所以 `id` 列的分区值会默认填充 `MIN_VALUE`。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下: + +``` +* 数据 --> 分区 +* 2017-01-01, 200 --> p201701_1000 +* 2017-01-01, 2000 --> p201701_1000 +* 2017-02-01, 100 --> p201701_1000 +* 2017-02-01, 2000 --> p201702_2000 +* 2017-02-15, 5000 --> p201702_2000 +* 2017-03-01, 2000 --> p201703_all +* 2017-03-10, 1 --> p201703_all +* 2017-04-01, 1000 --> 无法导入 +* 2017-05-01, 1000 --> 无法导入 +``` ### PROPERTIES diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md b/docs/help/Contents/Data Definition/ddl_stmt.md index e723d24348cc1f..147511f1e329f1 100644 --- a/docs/help/Contents/Data Definition/ddl_stmt.md +++ b/docs/help/Contents/Data Definition/ddl_stmt.md @@ -139,10 +139,10 @@ 4. partition_desc 1) Range 分区 语法: - PARTITION BY RANGE (k1) + PARTITION BY RANGE (k1, k2, ...) ( - PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1") - PARTITION partition_name VALUES LESS THAN MAXVALUE|("value2") + PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...) + PARTITION partition_name VALUES LESS THAN MAXVALUE|("value1", "value2", ...) ... ) 说明: @@ -152,6 +152,7 @@ TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME 3) 分区为左闭右开区间,首个分区的左边界为做最小值 4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。 + 5) 可以指定一列或多列作为分区列。如果分区值缺省,则会默认填充最小值。 注意: 1) 分区一般用于时间维度的数据管理 diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 041f33df9a523a..bb43b9b4823748 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -287,10 +287,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; - if (rangePartitionDesc.getPartitionColNames().size() != 1) { - throw new AnalysisException("Only allow partitioned by one column"); - } - rangePartitionDesc.analyze(columnDefs, properties); } diff --git a/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java b/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java index 5360f9a12f9a8a..3e539e4da9b970 100644 --- a/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java @@ -173,9 +173,9 @@ public PartitionInfo toPartitionInfo(List schema, Map part * VALUE LESS THEN (80) * * key range is: - * ( {MIN, MIN, MIN}, {10, 100, 1000} ) - * [ {10, 100, 500}, {50, 500, ? } ) - * [ {50, 500, ? }, {80, ?, ? } ) + * ( {MIN, MIN, MIN}, {10, 100, 1000} ) + * [ {10, 100, 1000}, {50, 500, MIN } ) + * [ {50, 500, MIN }, {80, MIN, MIN } ) */ RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(partitionColumns); for (SingleRangePartitionDesc desc : singleRangePartitionDescs) { diff --git a/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 3028ce8ade77e0..b0d98b7e10b420 100644 --- a/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -38,6 +38,8 @@ public class PartitionInfo implements Writable { protected Map idToDataProperty; // partition id -> replication num protected Map idToReplicationNum; + // true if the partition has multi partition columns + protected boolean isMultiColumnPartition = false; public PartitionInfo() { // for persist @@ -87,6 +89,10 @@ public static PartitionInfo read(DataInput in) throws IOException { return partitionInfo; } + public boolean isMultiColumnPartition() { + return isMultiColumnPartition; + } + public String toSql(OlapTable table, List partitionId) { return ""; } diff --git a/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java index 0feb5554114aca..f34633ec1cb77c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -26,6 +26,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -75,8 +76,7 @@ public static PartitionKey createPartitionKey(List keys, List co // fill the vacancy with MIN for (; i < columns.size(); ++i) { Type type = Type.fromPrimitiveType(columns.get(i).getDataType()); - partitionKey.keys.add( - LiteralExpr.createInfinity(type, false)); + partitionKey.keys.add(LiteralExpr.createInfinity(type, false)); partitionKey.types.add(columns.get(i).getDataType()); } @@ -127,8 +127,8 @@ public boolean isMaxValue() { return true; } - @Override // compare with other PartitionKey. used for partition prune + @Override public int compareTo(PartitionKey other) { int this_key_len = this.keys.size(); int other_key_len = other.keys.size(); @@ -184,10 +184,10 @@ public String toSql() { value = dateLiteral.toSql(); } } - if (keys.size() - 1 == i) { - strBuilder.append("(").append(value).append(")"); - } else { - strBuilder.append("(").append(value).append("), "); + strBuilder.append(value); + + if (keys.size() - 1 != i) { + strBuilder.append(", "); } i++; } @@ -198,9 +198,7 @@ public String toSql() { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("types: ["); - for (PrimitiveType type : types) { - builder.append(type.toString()); - } + builder.append(Joiner.on(", ").join(types)); builder.append("]; "); builder.append("keys: ["); diff --git a/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java index 118cb22ea6ef5b..404ed8c84e93dc 100644 --- a/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java @@ -73,6 +73,7 @@ public RangePartitionInfo(List partitionColumns) { super(PartitionType.RANGE); this.partitionColumns = partitionColumns; this.idToRange = new HashMap>(); + this.isMultiColumnPartition = partitionColumns.size() > 1; } public List getPartitionColumns() { @@ -367,6 +368,8 @@ public void readFields(DataInput in) throws IOException { partitionColumns.add(column); } + this.isMultiColumnPartition = partitionColumns.size() > 1; + counter = in.readInt(); for (int i = 0; i < counter; i++) { long partitionId = in.readLong(); @@ -405,20 +408,20 @@ public String toSql(OlapTable table, List partitionId) { // first partition if (!range.lowerEndpoint().isMinValue()) { sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx) - .append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql()); + .append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")"); sb.append(",\n"); } } else { Preconditions.checkNotNull(lastRange); if (!lastRange.upperEndpoint().equals(range.lowerEndpoint())) { sb.append("PARTITION ").append(FeNameFormat.FORBIDDEN_PARTITION_NAME).append(idx) - .append(" VALUES LESS THAN ").append(range.lowerEndpoint().toSql()); + .append(" VALUES LESS THAN (").append(range.lowerEndpoint().toSql()).append(")"); sb.append(",\n"); } } - sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN "); - sb.append(range.upperEndpoint().toSql()); + sb.append("PARTITION ").append(partitionName).append(" VALUES LESS THAN ("); + sb.append(range.upperEndpoint().toSql()).append(")"); if (partitionId != null) { partitionId.add(entry.getKey()); diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 4a290c117b73cd..536e8dd14a4974 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -17,14 +17,6 @@ package org.apache.doris.load; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import org.apache.commons.lang.StringUtils; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.ColumnSeparator; @@ -98,6 +90,16 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -519,7 +521,7 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType, Map>> tableToPartitionSources = Maps.newHashMap(); for (DataDescription dataDescription : dataDescriptions) { // create source - checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag()); + checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag(), etlJobType); job.addTableName(dataDescription.getTableName()); } for (Entry>> tableEntry : tableToPartitionSources.entrySet()) { @@ -648,7 +650,7 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType, public static void checkAndCreateSource(Database db, DataDescription dataDescription, Map>> tableToPartitionSources, - boolean deleteFlag) + boolean deleteFlag, EtlJobType jobType) throws DdlException { Source source = new Source(dataDescription.getFilePaths()); long tableId = -1; @@ -668,6 +670,11 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip throw new DdlException("Table [" + tableName + "] is not olap table"); } + if (((OlapTable) table).getPartitionInfo().isMultiColumnPartition() && jobType == EtlJobType.HADOOP) { + throw new DdlException("Load by hadoop cluster does not support table with multi partition columns." + + " Table: " + table.getName() + ". Try using broker load. See 'help broker load;'"); + } + // check partition if (dataDescription.getPartitionNames() != null && !dataDescription.getPartitionNames().isEmpty() && diff --git a/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java b/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java index a211185502ae19..b611467fb0f198 100644 --- a/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java +++ b/fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java @@ -34,6 +34,7 @@ /** * PullLoadSourceInfo */ +@Deprecated public class PullLoadSourceInfo implements Writable { private static final Logger LOG = LogManager.getLogger(PullLoadSourceInfo.class); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 649c594dc87353..585fdae91d6633 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -101,7 +101,7 @@ public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { throw new DdlException("Database[" + dbName + "] does not exist"); } // check data source info - LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions()); + LoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions(), EtlJobType.BROKER); // create job try { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0792682d6120a1..41f7f35ab07ef4 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -259,13 +259,14 @@ protected void setJobProperties(Map properties) throws DdlExcept } } - protected static void checkDataSourceInfo(Database db, List dataDescriptions) throws DdlException { + protected static void checkDataSourceInfo(Database db, List dataDescriptions, + EtlJobType jobType) throws DdlException { for (DataDescription dataDescription : dataDescriptions) { // loadInfo is a temporary param for the method of checkAndCreateSource. // >> Map>> loadInfo = Maps.newHashMap(); // only support broker load now - Load.checkAndCreateSource(db, dataDescription, loadInfo, false); + Load.checkAndCreateSource(db, dataDescription, loadInfo, false, jobType); } } diff --git a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java index e817a28e7d3368..8e8f25ba8f0def 100644 --- a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java +++ b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java @@ -31,8 +31,8 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Type; import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.TAggregationType; @@ -51,6 +51,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,6 +60,7 @@ import java.util.Set; // This class used to split data read from file to batch +@Deprecated public class DataSplitSink extends DataSink { private static final Logger LOG = LogManager.getLogger(Planner.class); diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index aa42f3652507a1..f69754afe6c9e5 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -217,12 +217,11 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr switch (partType) { case RANGE: { RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo(); - // range partition's only has one column - Preconditions.checkArgument(rangePartitionInfo.getPartitionColumns().size() == 1, - "number columns of range partition is not 1, number_columns=" - + rangePartitionInfo.getPartitionColumns().size()); - partitionParam.setPartition_column(rangePartitionInfo.getPartitionColumns().get(0).getName()); + for (Column partCol : rangePartitionInfo.getPartitionColumns()) { + partitionParam.addToPartition_columns(partCol.getName()); + } + int partColNum = rangePartitionInfo.getPartitionColumns().size(); DistributionInfo selectedDistInfo = null; for (Partition partition : table.getPartitions()) { if (partitionSet != null && !partitionSet.contains(partition.getName())) { @@ -231,12 +230,19 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); Range range = rangePartitionInfo.getRange(partition.getId()); + // set start keys if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) { - tPartition.setStart_key(range.lowerEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0)); + for (int i = 0; i < partColNum; i++) { + tPartition.addToStart_keys(range.lowerEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0)); + } } + // set end keys if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) { - tPartition.setEnd_key(range.upperEndpoint().getKeys().get(0).treeToThrift().getNodes().get(0)); + for (int i = 0; i < partColNum; i++) { + tPartition.addToEnd_keys(range.upperEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0)); + } } + for (MaterializedIndex index : partition.getMaterializedIndices()) { tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); diff --git a/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java b/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java index 245f1e6f0e75ad..d91ae0886860d6 100644 --- a/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java +++ b/fe/src/main/java/org/apache/doris/planner/RangePartitionPruner.java @@ -94,6 +94,8 @@ private Collection prune(RangeMap rangeMap, if (filter.lowerBoundInclusive && filter.upperBoundInclusive && filter.lowerBound != null && filter.upperBound != null && 0 == filter.lowerBound.compareLiteral(filter.upperBound)) { + + // eg: [10, 10], [null, null] if (filter.lowerBound instanceof NullLiteral && filter.upperBound instanceof NullLiteral) { // replace Null with min value LiteralExpr minKeyValue = LiteralExpr.createInfinity( @@ -109,6 +111,7 @@ private Collection prune(RangeMap rangeMap, maxKey.popColumn(); return result; } + // no in predicate BoundType lowerType = filter.lowerBoundInclusive ? BoundType.CLOSED : BoundType.OPEN; BoundType upperType = filter.upperBoundInclusive ? BoundType.CLOSED : BoundType.OPEN; diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index e1faaf089e20ed..9ad683fcf933a0 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -111,7 +111,7 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt, String origStmt) th } else { if (Config.disable_hadoop_load) { throw new DdlException("Load job by hadoop cluster is disabled." - + " Try use broker load. See 'help broker load;'"); + + " Try using broker load. See 'help broker load;'"); } jobType = EtlJobType.HADOOP; } diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java index 76d817bbb0560b..24648c61f7bd18 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadEtlTask.java @@ -26,11 +26,13 @@ import com.google.common.collect.Maps; -import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + // Used to process pull load etl task +@Deprecated public class PullLoadEtlTask extends LoadEtlTask { private static final Logger LOG = LogManager.getLogger(PullLoadEtlTask.class); private PullLoadJobMgr mgr; diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJob.java b/fe/src/main/java/org/apache/doris/task/PullLoadJob.java index e77a77706dec32..fa2e520a9e4cc8 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJob.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJob.java @@ -20,6 +20,7 @@ import org.apache.doris.load.LoadJob; import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +28,7 @@ import java.util.Set; // One pull load job +@Deprecated public class PullLoadJob { private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index f3e4865ef95b3a..385007b2e9a60a 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; +@Deprecated public class PullLoadJobMgr { private static final Logger LOG = LogManager.getLogger(PullLoadJobMgr.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java index f820169d46d5c0..66532127f4e50b 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java @@ -40,6 +40,7 @@ import java.util.Map; // Making a pull load job to some tasks +@Deprecated public class PullLoadPendingTask extends LoadPendingTask { private static final Logger LOG = LogManager.getLogger(PullLoadPendingTask.class); diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java index e312e5af0fda4c..3ac0ebf96dd3aa 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadTask.java @@ -42,6 +42,7 @@ import java.util.UUID; // A pull load task is used to process one table of this pull load job. +@Deprecated public class PullLoadTask { private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); // Input parameter diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java b/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java index d07b6a197c7b3f..c9b65736815b4b 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java @@ -27,8 +27,8 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.UserException; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.UserException; import org.apache.doris.planner.BrokerScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSplitSink; @@ -49,6 +49,7 @@ import java.util.List; // Planner used to generate a plan for pull load ETL work +@Deprecated public class PullLoadTaskPlanner { private static final Logger LOG = LogManager.getLogger(PullLoadTaskPlanner.class); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index d99cd8685035a2..e114aef9fb506e 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -141,7 +141,7 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Mock public void checkAndCreateSource(Database db, DataDescription dataDescription, Map>> tableToPartitionSources, - boolean deleteFlag) { + boolean deleteFlag, EtlJobType jobType) { } }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index b8ffcb035bd91a..2d10dbd8775657 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -118,6 +118,7 @@ struct TOlapTableIndexTablets { // its a closed-open range struct TOlapTablePartition { 1: required i64 id + // deprecated, use 'start_keys' and 'end_keys' instead 2: optional Exprs.TExprNode start_key 3: optional Exprs.TExprNode end_key @@ -125,6 +126,9 @@ struct TOlapTablePartition { 4: required i32 num_buckets 5: required list indexes + + 6: optional list start_keys + 7: optional list end_keys } struct TOlapTablePartitionParam { @@ -133,6 +137,7 @@ struct TOlapTablePartitionParam { 3: required i64 version // used to split a logical table to multiple paritions + // deprecated, use 'partition_columns' instead 4: optional string partition_column // used to split a partition to multiple tablets @@ -140,6 +145,8 @@ struct TOlapTablePartitionParam { // partitions 6: required list partitions + + 7: optional list partition_columns } struct TOlapTableIndexSchema {