From b12d13fe8b3160906618eba8199577eb12fe0cc7 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Wed, 13 Mar 2024 10:01:01 +0800 Subject: [PATCH 1/6] 1 --- be/src/exec/tablet_info.cpp | 69 ++++++++++ be/src/exec/tablet_info.h | 14 +- be/src/vec/sink/vrow_distribution.cpp | 78 ++++++++++- be/src/vec/sink/vrow_distribution.h | 8 +- .../org/apache/doris/nereids/DorisParser.g4 | 6 +- fe/fe-core/src/main/cup/sql_parser.cup | 4 + .../apache/doris/analysis/AnalyzeTblStmt.java | 2 +- .../analysis/InsertOverwriteTableStmt.java | 7 + .../doris/analysis/NativeInsertStmt.java | 6 + .../apache/doris/analysis/PartitionNames.java | 20 +-- .../org/apache/doris/catalog/OlapTable.java | 6 + .../insertoverwrite/InsertOverwriteUtil.java | 1 + .../nereids/analyzer/UnboundTableSink.java | 34 ++++- .../analyzer/UnboundTableSinkCreator.java | 17 +++ .../nereids/parser/LogicalPlanBuilder.java | 45 +++++-- .../insert/InsertIntoTableCommand.java | 1 + .../insert/InsertOverwriteTableCommand.java | 23 +++- .../insert/OlapInsertCommandContext.java | 14 ++ .../commands/insert/OlapInsertExecutor.java | 2 + .../apache/doris/planner/OlapTableSink.java | 4 + .../org/apache/doris/qe/StmtExecutor.java | 27 +++- .../doris/service/FrontendServiceImpl.java | 123 ++++++++++++++++++ gensrc/thrift/Descriptors.thrift | 2 + gensrc/thrift/FrontendService.thrift | 19 +++ 24 files changed, 489 insertions(+), 43 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index af065f332a00e4..21e2eec28f0477 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -47,6 +47,7 @@ #include "vec/columns/column_nullable.h" #include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vliteral.h" #include "vec/runtime/vdatetime_value.h" @@ -313,6 +314,10 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptrroot(); } } + + if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) { + _is_auto_detect_overwrite = true; + } } VOlapTablePartitionParam::~VOlapTablePartitionParam() { @@ -612,4 +617,68 @@ Status VOlapTablePartitionParam::add_partitions( return Status::OK(); } +Status VOlapTablePartitionParam::replace_partitions( + const std::vector& origin_partitions, + const std::vector& new_partitions) { + // replace partitions + std::set origin_ids; + for (const auto* part : origin_partitions) { + origin_ids.insert(part->id); + } + for (auto it = _partitions.begin(); it != _partitions.end(); it++) { + if (origin_ids.contains((*it)->id)) { + it = _partitions.erase(it); + } + } + //TODO: erase old value from map + + for (const auto& t_part : new_partitions) { + auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); + part->id = t_part.id; + part->is_mutable = t_part.is_mutable; + + part->num_buckets = t_part.num_buckets; + auto num_indexes = _schema->indexes().size(); + if (t_part.indexes.size() != num_indexes) { + return Status::InternalError( + "number of partition's index is not equal with schema's" + ", num_part_indexes={}, num_schema_indexes={}", + t_part.indexes.size(), num_indexes); + } + part->indexes = t_part.indexes; + std::sort(part->indexes.begin(), part->indexes.end(), + [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) { + return lhs.index_id < rhs.index_id; + }); + // check index + for (int j = 0; j < num_indexes; ++j) { + if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { + std::stringstream ss; + ss << "partition's index is not equal with schema's" + << ", part_index=" << part->indexes[j].index_id + << ", schema_index=" << _schema->indexes()[j]->index_id; + return Status::InternalError( + "partition's index is not equal with schema's" + ", part_index={}, schema_index={}", + part->indexes[j].index_id, _schema->indexes()[j]->index_id); + } + } + + // replace partitions + _partitions.emplace_back(part); + + // after _creating_partiton_keys + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + } + } else { + _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, + part); + } + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index a157b57948721f..345efeddf8587e 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -246,9 +246,14 @@ class VOlapTablePartitionParam { bool is_projection_partition() const { return _is_auto_partition; } bool is_auto_partition() const { return _is_auto_partition; } + bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; } + std::vector get_partition_keys() const { return _partition_slot_locs; } Status add_partitions(const std::vector& partitions); + // no need to del/reinsert partition keys, but change the link. + Status replace_partitions(const std::vector& origin_partitions, + const std::vector& new_partitions); vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; } vectorized::VExprSPtrs get_partition_function() { return _partition_function; } @@ -293,11 +298,12 @@ class VOlapTablePartitionParam { // only works when using list partition, the resource is owned by _partitions VOlapTablePartition* _default_partition = nullptr; - // for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition. bool _is_auto_partition = false; vectorized::VExprContextSPtrs _part_func_ctx = {nullptr}; vectorized::VExprSPtrs _partition_function = {nullptr}; TPartitionType::type _part_type; // support list or range + // "insert overwrite partition(*)", detect which partitions by BE + bool _is_auto_detect_overwrite = false; }; // indicate where's the tablet and all its replications (node-wise) @@ -360,13 +366,13 @@ class DorisNodesInfo { public: DorisNodesInfo() = default; DorisNodesInfo(const TPaloNodesInfo& t_nodes) { - for (auto& node : t_nodes.nodes) { + for (const auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } void setNodes(const TPaloNodesInfo& t_nodes) { _nodes.clear(); - for (auto& node : t_nodes.nodes) { + for (const auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } @@ -380,7 +386,7 @@ class DorisNodesInfo { void add_nodes(const std::vector& t_nodes) { for (const auto& node : t_nodes) { - auto node_info = find_node(node.id); + const auto* node_info = find_node(node.id); if (node_info == nullptr) { _nodes.emplace(node.id, node); } diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 358f8d3783ede8..f58f534c33d3a0 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -21,6 +21,8 @@ #include #include +#include +#include #include #include "common/logging.h" @@ -109,6 +111,54 @@ Status VRowDistribution::automatic_create_partition() { return status; } +static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { + TCreatePartitionResult result; + result.status = arg.status; + result.nodes = std::move(arg.nodes); + result.partitions = std::move(arg.partitions); + result.tablets = std::move(arg.tablets); + return result; +} + +// use _partitions and replace them +Status VRowDistribution::_replace_overwriting_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TReplacePartitionRequest request; + TReplacePartitionResult result; + request.__set_txn_id(_txn_id); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + + std::vector partition_ids(_partitions.size()); + std::transform(_partitions.begin(), _partitions.end(), partition_ids.begin(), + [](VOlapTablePartition* partition) { return partition->id; }); + request.__set_partition_ids(partition_ids); + + string be_endpoint = BackendOptions::get_be_endpoint(); + request.__set_be_endpoint(be_endpoint); + + LOG(WARNING) << "auto detect replace partition request: " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->replacePartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + LOG(WARNING) << "auto detect replace partition result: " << result; + if (result.status.status_code == TStatusCode::OK) { + RETURN_IF_ERROR(_vpartition->replace_partitions(_partitions, result.partitions)); + // reuse the function as the args' structure are same. it add nodes/locations and incremental_open + auto result_as_create = cast_as_create_result(result); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); + } + + return status; +} + void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, std::vector& tablet_ids) { tablet_ids.reserve(block->rows()); @@ -284,6 +334,27 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition( return Status::OK(); } +Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + + bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + + DCHECK(!_partitions.empty()); // otherwise it will be error return + RETURN_IF_ERROR(_replace_overwriting_partition()); + + return Status::OK(); +} + void VRowDistribution::_reset_row_part_tablet_ids( std::vector& row_part_tablet_ids, int64_t rows) { row_part_tablet_ids.resize(_schema->indexes().size()); @@ -345,7 +416,12 @@ Status VRowDistribution::generate_rows_distribution( _vpartition->set_transformed_slots(partition_cols_idx); } - if (_vpartition->is_auto_partition() && !_deal_batched) { + if (_vpartition->is_auto_detect_overwrite()) { + // when overwrite, no auto create partition allowed. + LOG(WARNING) << "is_auto_detect_overwriting!!!"; + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_overwrite( + block.get(), has_filtered_rows, row_part_tablet_ids)); + } else if (_vpartition->is_auto_partition() && !_deal_batched) { RETURN_IF_ERROR(_generate_rows_distribution_for_auto_partition( block.get(), partition_cols_idx, has_filtered_rows, row_part_tablet_ids, rows_stat_val)); diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 12acda73ed2033..208f5e7ca4e9fb 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -153,6 +153,11 @@ class VRowDistribution { vectorized::Block* block, bool has_filtered_rows, std::vector& row_part_tablet_ids); + Status _generate_rows_distribution_for_auto_overwrite( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + Status _replace_overwriting_partition(); + void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, int64_t rows); @@ -177,11 +182,12 @@ class VRowDistribution { OlapTableLocationParam* _location = nullptr; // int64_t _number_output_rows = 0; const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr; + // generally it's writer's on_partitions_created CreatePartitionCallback _create_partition_callback = nullptr; void* _caller = nullptr; std::shared_ptr _schema; - // reuse for find_tablet. + // reuse for find_tablet. save partitions found by find_tablets std::vector _partitions; std::vector _skip; std::vector _tablet_indexes; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 921059637a596c..e16fa2d092d926 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -124,9 +124,9 @@ constraint partitionSpec : TEMPORARY? (PARTITION | PARTITIONS) partitions=identifierList | TEMPORARY? PARTITION partition=errorCapturingIdentifier - // TODO: support analyze external table partition spec https://github.com/apache/doris/pull/24154 - // | PARTITIONS LEFT_PAREN ASTERISK RIGHT_PAREN - // | PARTITIONS WITH RECENT + | (PARTITION | PARTITIONS) LEFT_PAREN ASTERISK RIGHT_PAREN // for auto detect partition in overwriting + // TODO: support analyze external table partition spec https://github.com/apache/doris/pull/24154 + // | PARTITIONS WITH RECENT ; partitionTable diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 22dd017b68a41e..a2b4b78dea9bb9 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -6071,6 +6071,10 @@ partition_names ::= {: RESULT = new PartitionNames(true); :} + | KW_PARTITION LPAREN STAR RPAREN + {: + RESULT = new PartitionNames(true); + :} | KW_PARTITIONS KW_WITH KW_RECENT INTEGER_LITERAL:count {: RESULT = new PartitionNames(count); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index c6c95da9de0ed8..52967b01c83890 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -259,7 +259,7 @@ public boolean isAllPartitions() { if (partitionNames == null) { return false; } - return partitionNames.isAllPartitions(); + return partitionNames.isAutoReplace(); } public long getPartitionCount() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java index 17cca1cecc5e06..e0c4619fcfc105 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java @@ -73,6 +73,13 @@ public List getPartitionNames() { return target.getPartitionNames().getPartitionNames(); } + /* + * auto detect which partitions to replace. enable by partition(*) grammer + */ + public boolean isAutoReplace() { + return target.getPartitionNames().isAutoReplace(); + } + @Override public void analyze(Analyzer analyzer) throws UserException { target.getTblName().analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 3f36746444127e..af4e0b8c3c412a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -163,6 +163,7 @@ public class NativeInsertStmt extends InsertStmt { boolean hasEmptyTargetColumns = false; private boolean allowAutoPartition = true; + private boolean autoReplacePartition = false; enum InsertType { NATIVE_INSERT("insert_"), @@ -317,6 +318,11 @@ public boolean isTransactionBegin() { return isTransactionBegin; } + public NativeInsertStmt withAutoReplaceEnabled() { + this.autoReplacePartition = true; + return this; + } + protected void preCheckAnalyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java index ca26a2978e0e54..0a2928df1a8265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java @@ -48,7 +48,7 @@ public class PartitionNames implements ParseNode, Writable { // true if these partitions are temp partitions @SerializedName(value = "isTemp") private final boolean isTemp; - private final boolean allPartitions; + private final boolean autoReplace; private final long count; // Default partition count to collect statistic for external table. private static final long DEFAULT_PARTITION_COUNT = 100; @@ -56,28 +56,28 @@ public class PartitionNames implements ParseNode, Writable { public PartitionNames(boolean isTemp, List partitionNames) { this.partitionNames = partitionNames; this.isTemp = isTemp; - this.allPartitions = false; + this.autoReplace = false; this.count = 0; } public PartitionNames(PartitionNames other) { this.partitionNames = Lists.newArrayList(other.partitionNames); this.isTemp = other.isTemp; - this.allPartitions = other.allPartitions; + this.autoReplace = other.autoReplace; this.count = 0; } - public PartitionNames(boolean allPartitions) { + public PartitionNames(boolean autoReplace) { this.partitionNames = null; this.isTemp = false; - this.allPartitions = allPartitions; + this.autoReplace = autoReplace; this.count = 0; } public PartitionNames(long partitionCount) { this.partitionNames = null; this.isTemp = false; - this.allPartitions = false; + this.autoReplace = false; this.count = partitionCount; } @@ -89,8 +89,8 @@ public boolean isTemp() { return isTemp; } - public boolean isAllPartitions() { - return allPartitions; + public boolean isAutoReplace() { + return autoReplace; } public long getCount() { @@ -99,10 +99,10 @@ public long getCount() { @Override public void analyze(Analyzer analyzer) throws AnalysisException { - if (allPartitions && count > 0) { + if (autoReplace && count > 0) { throw new AnalysisException("All partition and partition count couldn't be set at the same time."); } - if (allPartitions || count > 0) { + if (autoReplace || count > 0) { return; } if (partitionNames == null || partitionNames.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 463b3b1f19d8e9..51c8f7b3f1bf1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1106,6 +1106,12 @@ public Set getPartitionNames() { return Sets.newHashSet(nameToPartition.keySet()); } + public List getPartitionNamesByIds(List partitionIds) { + return partitionIds.stream().map(id -> { + return idToPartition.get(id).getName(); + }).collect(Collectors.toList()); + } + public List getPartitionIds() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index af906d2653e7d9..7d0acf6d47b4de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 2a4416686cc9c3..7e0a6899c93638 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -33,9 +33,11 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import com.clearspring.analytics.util.Lists; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -50,6 +52,7 @@ public class UnboundTableSink extends UnboundLogicalSin private final List partitions; private final boolean isPartialUpdate; private final DMLCommandType dmlCommandType; + private boolean autoDetectPartition = false; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { @@ -74,10 +77,31 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, + boolean isAutoDetectPartition, boolean isPartialUpdate, DMLCommandType dmlCommandType, + Optional groupExpression, Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); + this.hints = Utils.copyRequiredList(hints); + this.temporaryPartition = false; + this.partitions = Lists.newArrayList(); + this.autoDetectPartition = isAutoDetectPartition; + this.isPartialUpdate = isPartialUpdate; + this.dmlCommandType = dmlCommandType; + } + public boolean isTemporaryPartition() { return temporaryPartition; } + public boolean isAutoDetectPartition() { + return autoDetectPartition; + } + public List getPartitions() { return partitions; } @@ -93,8 +117,8 @@ public boolean isPartialUpdate() { @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, + isPartialUpdate, dmlCommandType, groupExpression, Optional.empty(), children.get(0)); } @Override @@ -134,14 +158,14 @@ public int hashCode() { @Override public Plan withGroupExpression(Optional groupExpression) { - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, + isPartialUpdate, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 335d2f58035d62..16855a25f1fb5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -71,4 +71,21 @@ public static LogicalSink createUnboundTableSink(List na } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } + + /** + * create unbound sink for DML plan with auto detect overwrite partition enable + */ + public static LogicalSink createUnboundTableSink(List nameParts, + List colNames, List hints, + boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, false, + isPartialUpdate, dmlCommandType, Optional.empty(), + Optional.empty(), plan); + } + throw new RuntimeException( + "Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index bb7f316defb936..18644d45de3f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -527,17 +527,34 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { Optional labelName = ctx.labelName == null ? Optional.empty() : Optional.of(ctx.labelName.getText()); List colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols); // TODO visit partitionSpecCtx - Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan plan = visitQuery(ctx.query()); - LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink( - tableName.build(), - colNames, - ImmutableList.of(), - partitionSpec.first, - partitionSpec.second, - ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), - DMLCommandType.INSERT, - plan); + // partitionSpec may be NULL. means auto detect partition. only available when + // IOT + Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); + UnboundTableSink sink; + if (partitionSpec.second == null) { // auto detect partition + if (!isOverwrite) { + throw new ParseException("Only support wildcard in overwrite partition", ctx); + } + sink = UnboundTableSinkCreator.createUnboundTableSink( + tableName.build(), + colNames, + ImmutableList.of(), + true, + ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), + DMLCommandType.INSERT, + plan); + } else { // normal partition + sink = UnboundTableSinkCreator.createUnboundTableSink( + tableName.build(), + colNames, + ImmutableList.of(), + partitionSpec.first, + partitionSpec.second, + ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), + DMLCommandType.INSERT, + plan); + } LogicalPlan command; if (isOverwrite) { command = new InsertOverwriteTableCommand(sink, labelName); @@ -567,7 +584,9 @@ public Pair> visitPartitionSpec(PartitionSpecContext ctx) boolean temporaryPartition = false; if (ctx != null) { temporaryPartition = ctx.TEMPORARY() != null; - if (ctx.partition != null) { + if (ctx.ASTERISK() != null) { + partitions = null; + } else if (ctx.partition != null) { partitions = ImmutableList.of(ctx.partition.getText()); } else { partitions = visitIdentifierList(ctx.partitions); @@ -849,6 +868,10 @@ public LogicalPlan visitUpdate(UpdateContext ctx) { public LogicalPlan visitDelete(DeleteContext ctx) { List tableName = visitMultipartIdentifier(ctx.tableName); Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); + // TODO: now dont support delete auto detect partition. + if (partitionSpec == null) { + throw new ParseException("Now don't support auto detect partitions in deleting", ctx); + } LogicalPlan query = withTableAlias(LogicalPlanBuilderAssistant.withCheckPolicy( new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName, partitionSpec.second, partitionSpec.first)), ctx.tableAlias()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 3e4ed3afd78d9d..8bf454f0980d2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -162,6 +162,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor throw new AnalysisException("group commit is not supported in Nereids now"); } OlapTable olapTable = (OlapTable) targetTableIf; + // the insertCtx contains some variables to adjust SinkNode insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx); boolean isEnableMemtableOnSinkNode = olapTable.getTableProperty().getUseSchemaLightChange() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 788871c744e384..d0292a14201593 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -87,6 +87,10 @@ public void setLabelName(Optional labelName) { this.labelName = labelName; } + public boolean isAutoDetectOverwrite() { + return ((UnboundTableSink) this.logicalQuery).isAutoDetectPartition(); + } + @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { if (!ctx.getSessionVariable().isEnableNereidsDML()) { @@ -138,9 +142,15 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { long taskId = Env.getCurrentEnv().getInsertOverwriteManager() .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); try { - InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); - insertInto(ctx, executor, tempPartitionNames); - InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); + if (isAutoDetectOverwrite()) { + // when inserting, BE will call to replace partition by FrontendService, FE do + // the real add&replacement and return replace result. so there's no need to do anything else + insertInto(ctx, executor, tempPartitionNames); + } else { + InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + insertInto(ctx, executor, tempPartitionNames); + InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); + } Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); } catch (Exception e) { LOG.warn("insert into overwrite failed"); @@ -160,6 +170,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) throws Exception { + // copy sink tot replace by tempPartitions UnboundLogicalSink copySink; InsertCommandContext insertCtx; if (logicalQuery instanceof UnboundTableSink) { @@ -173,9 +184,9 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List sink.isPartialUpdate(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - insertCtx = new OlapInsertCommandContext(); - ((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false); + // 1. for overwrite situation, we disable auto create partition. + // 2. we save and pass overwrite auto detect by insertCtx + insertCtx = new OlapInsertCommandContext(false); } else if (logicalQuery instanceof UnboundHiveTableSink) { UnboundHiveTableSink sink = (UnboundHiveTableSink) logicalQuery; copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java index 23dd8d13d9182f..f07390d2a11da3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java @@ -22,11 +22,25 @@ */ public class OlapInsertCommandContext extends InsertCommandContext { private boolean allowAutoPartition = true; + private boolean autoDetectOverwrite = false; + + public OlapInsertCommandContext() { + + } + + public OlapInsertCommandContext(boolean allowAutoPartition, boolean autoDetectOverwrite) { + this.allowAutoPartition = allowAutoPartition; + this.autoDetectOverwrite = autoDetectOverwrite; + } public boolean isAllowAutoPartition() { return allowAutoPartition; } + public boolean isAutoDetectOverwrite() { + return autoDetectOverwrite; + } + public void setAllowAutoPartition(boolean allowAutoPartition) { this.allowAutoPartition = allowAutoPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 0598b0d46f6b2c..b0bffc2d4b99f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -127,10 +127,12 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys false, isStrictMode, timeout); + // complete and set commands both modify thrift struct olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); if (!olapInsertCtx.isAllowAutoPartition()) { olapTableSink.setAutoPartition(false); } + olapTableSink.setAutoDetectOverwite(olapInsertCtx.isAutoDetectOverwrite()); // update // set schema and partition info for tablet id shuffle exchange diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 19c6e7e1d5a752..958186307addc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -190,6 +190,10 @@ public void setAutoPartition(boolean var) { tDataSink.getOlapTableSink().getPartition().setEnableAutomaticPartition(var); } + public void setAutoDetectOverwite(boolean var) { + tDataSink.getOlapTableSink().getPartition().setEnableAutoDetectOverwrite(var); + } + // must called after tupleDescriptor is computed public void complete(Analyzer analyzer) throws UserException { for (Long partitionId : partitionIds) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 72073e1e8478e4..0b03a49a5cf939 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2794,7 +2794,10 @@ private void handleIotStmt() { ConnectContext.get().setSkipAuth(true); try { InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt; - if (iotStmt.getPartitionNames().size() == 0) { + if (iotStmt.isAutoReplace()) { + // insert overwrite table auto detect which partitions need to replace + handleAutoOverwritePartition(iotStmt); + } else if (iotStmt.getPartitionNames().size() == 0) { // insert overwrite table handleOverwriteTable(iotStmt); } else { @@ -2943,6 +2946,28 @@ private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) { } } + /* + * we use a anti-AutoPartition-like function to find partitions to replace. + */ + private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) { + // register query in replaceManager + + // + TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl()); + try { + parsedStmt = new NativeInsertStmt(targetTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()), + iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true).withAutoReplaceEnabled(); + parsedStmt.setUserInfo(context.getCurrentUserIdentity()); + execute(); + } catch (Exception e) { + LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + handleIotRollback(targetTableName); + return; + } + + } + private void handleIotRollback(TableName table) { // insert error drop the tmp table DropTableStmt dropTableStmt = new DropTableStmt(true, table, true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 04b7ea43997d59..2b175ad5f5a21c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -72,6 +72,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; @@ -201,6 +202,8 @@ import org.apache.doris.thrift.TQueryStatsResult; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReplicaInfo; +import org.apache.doris.thrift.TReplacePartitionRequest; +import org.apache.doris.thrift.TReplacePartitionResult; import org.apache.doris.thrift.TReportCommitTxnResultRequest; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -3552,6 +3555,126 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t return result; } + @Override + public TReplacePartitionResult replacePartition(TReplacePartitionRequest request) throws TException { + LOG.info("Receive create partition request: {}", request); + long dbId = request.getDbId(); + long tableId = request.getTableId(); + List partitionIds = request.partition_ids; + TReplacePartitionResult result = new TReplacePartitionResult(); + TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); + if (!Env.getCurrentEnv().isMaster()) { + errorStatus.setStatusCode(TStatusCode.NOT_MASTER); + errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG); + return result; + } + + Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId); + if (db == null) { + errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + Table table = db.getTable(tableId).get(); + if (table == null) { + errorStatus.setErrorMsgs( + (Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId)))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + if (!(table instanceof OlapTable)) { + errorStatus.setErrorMsgs( + Lists.newArrayList(String.format("dbId=%d tableId=%d is not olap table", dbId, tableId))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + OlapTable olapTable = (OlapTable) table; + List partitionNames = olapTable.getPartitionNamesByIds(partitionIds); + List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); + try { + InsertOverwriteUtil.addTempPartitions(olapTable, partitionNames, tempPartitionNames); + InsertOverwriteUtil.replacePartition(olapTable, partitionNames, tempPartitionNames); + } catch (DdlException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + + // build partition & tablets + List partitions = Lists.newArrayList(); + List tablets = Lists.newArrayList(); + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + for (String partitionName : partitionNames) { + Partition partition = table.getPartition(partitionName); + TOlapTablePartition tPartition = new TOlapTablePartition(); + tPartition.setId(partition.getId()); + int partColNum = partitionInfo.getPartitionColumns().size(); + // set partition keys + OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum); + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( + index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); + tPartition.setNumBuckets(index.getTablets().size()); + } + tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + partitions.add(tPartition); + // tablet + int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + + 1; + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + for (Tablet tablet : index.getTablets()) { + // we should ensure the replica backend is alive + // otherwise, there will be a 'unknown node id, id=xxx' error for stream load + // BE id -> path hash + Multimap bePathsMap; + try { + if (Config.isCloudMode() && request.isSetBeEndpoint()) { + bePathsMap = ((CloudTablet) tablet) + .getNormalReplicaBackendPathMapCloud(request.be_endpoint); + } else { + bePathsMap = tablet.getNormalReplicaBackendPathMap(); + } + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + if (bePathsMap.keySet().size() < quorum) { + LOG.warn("auto go quorum exception"); + } + tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + } + } + } + result.setPartitions(partitions); + result.setTablets(tablets); + + // build nodes + List nodeInfos = Lists.newArrayList(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getAllBackendIds(false)) { + Backend backend = systemInfoService.getBackend(id); + nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); + } + result.setNodes(nodeInfos); + + // successfully return + result.setStatus(new TStatus(TStatusCode.OK)); + if (LOG.isDebugEnabled()) { + LOG.debug("send replace partition result: {}", result); + } + return result; + } + public TGetMetaResult getMeta(TGetMetaRequest request) throws TException { String clientAddr = getClientAddrAsString(); if (LOG.isDebugEnabled()) { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 76c8b1b7b068ec..fba55177edeefe 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -205,6 +205,8 @@ struct TOlapTablePartitionParam { 8: optional list partition_function_exprs 9: optional bool enable_automatic_partition 10: optional Partitions.TPartitionType partition_type + // insert overwrite partition(*) + 11: optional bool enable_auto_detect_overwrite } struct TOlapTableIndex { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7bdbb4fc0ed03..f4a9e8f617815c 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1283,6 +1283,23 @@ struct TCreatePartitionResult { 4: optional list nodes } +// these two for auto detect replacing partition +struct TReplacePartitionRequest { + 1: optional i64 txn_id + 2: optional i64 db_id + 3: optional i64 table_id + 4: optional list partition_ids // partition to replace. + // be_endpoint = : to distinguish a particular BE + 5: optional string be_endpoint +} + +struct TReplacePartitionResult { + 1: optional Status.TStatus status + 2: optional list partitions + 3: optional list tablets + 4: optional list nodes +} + struct TGetMetaReplica { 1: optional i64 id } @@ -1498,6 +1515,8 @@ service FrontendService { TAutoIncrementRangeResult getAutoIncrementRange(1: TAutoIncrementRangeRequest request) TCreatePartitionResult createPartition(1: TCreatePartitionRequest request) + // insert overwrite partition(*) + TReplacePartitionResult replacePartition(1: TReplacePartitionRequest request) TGetMetaResult getMeta(1: TGetMetaRequest request) From 40fa862dc2837cf2105debffc944423851f57946 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Wed, 13 Mar 2024 23:05:04 +0800 Subject: [PATCH 2/6] done --- be/src/exec/tablet_info.cpp | 72 ++++++++------- be/src/exec/tablet_info.h | 8 +- be/src/vec/sink/vrow_distribution.cpp | 59 ++++++++----- be/src/vec/sink/vrow_distribution.h | 13 +++ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 16 ++-- .../InsertOverwriteManager.java | 87 +++++++++++++++++++ .../insertoverwrite/InsertOverwriteUtil.java | 3 +- .../insert/InsertOverwriteTableCommand.java | 51 ++++++++--- .../insert/OlapInsertCommandContext.java | 14 ++- .../commands/insert/OlapInsertExecutor.java | 7 +- .../apache/doris/planner/OlapTableSink.java | 4 + .../doris/service/FrontendServiceImpl.java | 60 +++++++++++-- gensrc/thrift/Descriptors.thrift | 1 + gensrc/thrift/FrontendService.thrift | 2 +- 14 files changed, 305 insertions(+), 92 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 21e2eec28f0477..c865e9d79ddfe9 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -23,9 +23,10 @@ #include #include #include -#include #include +#include +#include #include #include #include @@ -40,14 +41,10 @@ #include "runtime/primitive_type.h" #include "runtime/raw_value.h" #include "runtime/types.h" -#include "util/hash_util.hpp" #include "util/string_parser.hpp" #include "util/string_util.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" -#include "vec/common/assert_cast.h" -#include "vec/common/string_ref.h" -#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vliteral.h" #include "vec/runtime/vdatetime_value.h" @@ -317,6 +314,8 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptrfirst->get_by_position(pos).column).mutate(); + //TODO: use assert_cast before insert_data switch (t_expr.node_type) { case TExprNodeType::DATE_LITERAL: { if (TypeDescriptor::from_thrift(t_expr.type).is_date_v2_type()) { @@ -592,10 +592,6 @@ Status VOlapTablePartitionParam::add_partitions( // check index for (int j = 0; j < num_indexes; ++j) { if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { - std::stringstream ss; - ss << "partition's index is not equal with schema's" - << ", part_index=" << part->indexes[j].index_id - << ", schema_index=" << _schema->indexes()[j]->index_id; return Status::InternalError( "partition's index is not equal with schema's" ", part_index={}, schema_index={}", @@ -618,25 +614,35 @@ Status VOlapTablePartitionParam::add_partitions( } Status VOlapTablePartitionParam::replace_partitions( - const std::vector& origin_partitions, + std::vector& old_partition_ids, const std::vector& new_partitions) { - // replace partitions - std::set origin_ids; - for (const auto* part : origin_partitions) { - origin_ids.insert(part->id); - } - for (auto it = _partitions.begin(); it != _partitions.end(); it++) { - if (origin_ids.contains((*it)->id)) { - it = _partitions.erase(it); - } - } - //TODO: erase old value from map + // remove old replaced partitions + DCHECK(old_partition_ids.size() == new_partitions.size()); - for (const auto& t_part : new_partitions) { + // init and add new partitions. insert into _partitions + for (int i = 0; i < new_partitions.size(); i++) { + const auto& t_part = new_partitions[i]; + auto& old_part = _partitions[i]; auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part->id = t_part.id; part->is_mutable = t_part.is_mutable; + /// just substitute directly. no need to remove and reinsert keys. + // range partition + if (t_part.__isset.start_keys) { + part->start_key = std::move(old_part->start_key); + } + if (t_part.__isset.end_keys) { + part->end_key = std::move(old_part->end_key); + } + // list partition + if (t_part.__isset.in_keys) { + part->in_keys = std::move(old_part->in_keys); + if (t_part.__isset.is_default_partition && t_part.is_default_partition) { + _default_partition = part; + } + } + part->num_buckets = t_part.num_buckets; auto num_indexes = _schema->indexes().size(); if (t_part.indexes.size() != num_indexes) { @@ -653,10 +659,6 @@ Status VOlapTablePartitionParam::replace_partitions( // check index for (int j = 0; j < num_indexes; ++j) { if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { - std::stringstream ss; - ss << "partition's index is not equal with schema's" - << ", part_index=" << part->indexes[j].index_id - << ", schema_index=" << _schema->indexes()[j]->index_id; return Status::InternalError( "partition's index is not equal with schema's" ", part_index={}, schema_index={}", @@ -664,17 +666,23 @@ Status VOlapTablePartitionParam::replace_partitions( } } - // replace partitions + // add new partitions with new id. _partitions.emplace_back(part); - - // after _creating_partiton_keys + // replace items in _partition_maps if (_is_in_partition) { for (auto& in_key : part->in_keys) { - _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + (*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part; } } else { - _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, - part); + (*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] = + part; + } + } + // remove old partitions by id + std::ranges::sort(old_partition_ids); + for (auto it = _partitions.begin(); it != _partitions.end(); it++) { + if (std::ranges::binary_search(old_partition_ids, (*it)->id)) { + it = _partitions.erase(it); } } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 345efeddf8587e..fb982538592946 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -119,7 +119,7 @@ using OlapTableIndexTablets = TOlapTableIndexTablets; using BlockRow = std::pair; using BlockRowWithIndicator = - std::tuple; // [block, column, is_transformed] + std::tuple; // [block, row, is_transformed] struct VOlapTablePartition { int64_t id = 0; @@ -247,12 +247,13 @@ class VOlapTablePartitionParam { bool is_auto_partition() const { return _is_auto_partition; } bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; } + int64_t get_overwrite_group_id() const { return _overwrite_group_id; } std::vector get_partition_keys() const { return _partition_slot_locs; } Status add_partitions(const std::vector& partitions); - // no need to del/reinsert partition keys, but change the link. - Status replace_partitions(const std::vector& origin_partitions, + // no need to del/reinsert partition keys, but change the link. reset the _partitions items + Status replace_partitions(std::vector& old_partition_ids, const std::vector& new_partitions); vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; } @@ -304,6 +305,7 @@ class VOlapTablePartitionParam { TPartitionType::type _part_type; // support list or range // "insert overwrite partition(*)", detect which partitions by BE bool _is_auto_detect_overwrite = false; + int64_t _overwrite_group_id = 0; }; // indicate where's the tablet and all its replications (node-wise) diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index f58f534c33d3a0..053d0ef567b9ac 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -90,7 +90,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_partitionValues(_partitions_need_create); request.__set_be_endpoint(be_endpoint); - VLOG(1) << "automatic partition rpc begin request " << request; + VLOG_NOTICE << "automatic partition rpc begin request " << request; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; int time_out = _state->execution_timeout() * 1000; RETURN_IF_ERROR(ThriftRpcHelper::rpc( @@ -101,7 +101,7 @@ Status VRowDistribution::automatic_create_partition() { time_out)); Status status(Status::create(result.status)); - VLOG(1) << "automatic partition rpc end response " << result; + VLOG_NOTICE << "automatic partition rpc end response " << result; if (result.status.status_code == TStatusCode::OK) { // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); @@ -111,6 +111,7 @@ Status VRowDistribution::automatic_create_partition() { return status; } +// for reuse the same create callback of create-partition static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { TCreatePartitionResult result; result.status = arg.status; @@ -125,19 +126,28 @@ Status VRowDistribution::_replace_overwriting_partition() { SCOPED_TIMER(_add_partition_request_timer); TReplacePartitionRequest request; TReplacePartitionResult result; - request.__set_txn_id(_txn_id); + request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); request.__set_db_id(_vpartition->db_id()); request.__set_table_id(_vpartition->table_id()); - std::vector partition_ids(_partitions.size()); - std::transform(_partitions.begin(), _partitions.end(), partition_ids.begin(), - [](VOlapTablePartition* partition) { return partition->id; }); - request.__set_partition_ids(partition_ids); + std::vector request_part_ids; + request_part_ids.reserve(_partitions.size()); + for (const auto& part : _partitions) { + if (!_new_partition_ids.contains(part->id)) { + request_part_ids.push_back(part->id); + } // otherwise means replaced already. + } + // de-duplicate. there's no check in FE + auto deduper = std::set(request_part_ids.begin(), request_part_ids.end()); + request_part_ids.assign(deduper.begin(), deduper.end()); + + + request.__set_partition_ids(request_part_ids); string be_endpoint = BackendOptions::get_be_endpoint(); request.__set_be_endpoint(be_endpoint); - LOG(WARNING) << "auto detect replace partition request: " << request; + VLOG_NOTICE << "auto detect replace partition request: " << request; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; int time_out = _state->execution_timeout() * 1000; RETURN_IF_ERROR(ThriftRpcHelper::rpc( @@ -148,9 +158,14 @@ Status VRowDistribution::_replace_overwriting_partition() { time_out)); Status status(Status::create(result.status)); - LOG(WARNING) << "auto detect replace partition result: " << result; + VLOG_NOTICE << "auto detect replace partition result: " << result; if (result.status.status_code == TStatusCode::OK) { - RETURN_IF_ERROR(_vpartition->replace_partitions(_partitions, result.partitions)); + // record new partitions + for (const auto& part : result.partitions) { + _new_partition_ids.insert(part.id); + } + // replace data in _partitions + RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); // reuse the function as the args' structure are same. it add nodes/locations and incremental_open auto result_as_create = cast_as_create_result(result); RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); @@ -340,6 +355,12 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( auto num_rows = block->rows(); bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + RETURN_IF_ERROR(_replace_overwriting_partition()); + + // regenerate locations for new partitions & tablets + _reset_find_tablets(num_rows); RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, _tablet_indexes, stop_processing, _skip)); if (has_filtered_rows) { @@ -348,10 +369,6 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( } } RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); - - DCHECK(!_partitions.empty()); // otherwise it will be error return - RETURN_IF_ERROR(_replace_overwriting_partition()); - return Status::OK(); } @@ -393,11 +410,7 @@ Status VRowDistribution::generate_rows_distribution( auto num_rows = block->rows(); _tablet_finder->filter_bitmap().Reset(num_rows); - - //reuse vars for find_tablets - _partitions.assign(num_rows, nullptr); - _skip.assign(num_rows, false); - _tablet_indexes.assign(num_rows, 0); + _reset_find_tablets(num_rows); // if there's projection of partition calc, we need to calc it first. auto [part_ctxs, part_funcs] = _get_partition_function(); @@ -418,7 +431,6 @@ Status VRowDistribution::generate_rows_distribution( if (_vpartition->is_auto_detect_overwrite()) { // when overwrite, no auto create partition allowed. - LOG(WARNING) << "is_auto_detect_overwriting!!!"; RETURN_IF_ERROR(_generate_rows_distribution_for_auto_overwrite( block.get(), has_filtered_rows, row_part_tablet_ids)); } else if (_vpartition->is_auto_partition() && !_deal_batched) { @@ -435,4 +447,11 @@ Status VRowDistribution::generate_rows_distribution( return Status::OK(); } +// reuse vars for find_tablets +void VRowDistribution::_reset_find_tablets(int64_t rows) { + _partitions.assign(rows, nullptr); + _skip.assign(rows, false); + _tablet_indexes.assign(rows, 0); +} + } // namespace doris::vectorized diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 208f5e7ca4e9fb..f4e1a1dfdb2e8e 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -18,6 +18,7 @@ #pragma once // IWYU pragma: no_include +#include #include #include #include @@ -50,6 +51,15 @@ class RowPartTabletIds { std::vector row_ids; std::vector partition_ids; std::vector tablet_ids; + + std::string debug_string() const { + std::string value; + value.reserve(row_ids.size() * 15); + for (int i = 0; i < row_ids.size(); i++) { + value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + } + return value; + } }; // void* for caller @@ -160,6 +170,7 @@ class VRowDistribution { void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, int64_t rows); + void _reset_find_tablets(int64_t rows); RuntimeState* _state = nullptr; int _batch_size = 0; @@ -193,6 +204,8 @@ class VRowDistribution { std::vector _tablet_indexes; std::vector _tablet_ids; std::vector _missing_map; // indice of missing values in partition_col + // for auto detect overwrite partition + std::set _new_partition_ids; // if contains, not to replace it again. }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 68506ca161e015..6e81344a1c2741 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -24,13 +24,13 @@ #include #include -#include #include #include #include #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" @@ -40,17 +40,14 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "service/brpc.h" -#include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/defer_op.h" #include "util/doris_metrics.h" -#include "util/threadpool.h" -#include "util/thrift_util.h" #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/sink/delta_writer_v2_pool.h" -#include "vec/sink/load_stream_stub.h" +// NOLINTNEXTLINE(unused-includes) +#include "vec/sink/load_stream_stub.h" // IWYU pragma: keep #include "vec/sink/load_stream_stub_pool.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -107,6 +104,8 @@ Status VTabletWriterV2::_incremental_open_streams( } _indexes_from_node[node].emplace_back(tablet); known_indexes.insert(index.index_id); + VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id + << ")"; } } } @@ -207,7 +206,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { "output_tuple_slot_num {} should be equal to output_expr_num {}", _output_tuple_desc->slots().size() + 1, _vec_output_expr_ctxs.size()); }); - if (_vec_output_expr_ctxs.size() > 0 && + if (!_vec_output_expr_ctxs.empty() && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " << "output_tuple_slot_num " << _output_tuple_desc->slots().size() @@ -298,7 +297,7 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { for (const auto& partition : _vpartition->get_partitions()) { for (const auto& index : partition->indexes) { for (const auto& tablet_id : index.tablets) { - auto tablet_location = _location->find_tablet(tablet_id); + auto* tablet_location = _location->find_tablet(tablet_id); DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", { tablet_location = nullptr; }); if (tablet_location == nullptr) { @@ -359,6 +358,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_partition_id(partition_id); tablet.set_index_id(index_id); tablet.set_tablet_id(tablet_id); + VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 9835efa531b2ef..32d52d7e7af100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -26,6 +26,7 @@ import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -34,10 +35,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; public class InsertOverwriteManager extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class); @@ -47,6 +50,16 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { @SerializedName(value = "tasks") private Map tasks = Maps.newConcurrentMap(); + // > + // for iot auto detect tasks. a txn will make many task by different rpc + @SerializedName(value = "taskGroups") + private Map> taskGroups = Maps.newConcurrentMap(); + // for one task group, there may be different requests about changing a partition to new. + // but we only change one time and save the relations in partitionPairs. they're protected by taskLocks + private Map taskLocks = Maps.newConcurrentMap(); + // > + private Map> partitionPairs = Maps.newConcurrentMap(); + public InsertOverwriteManager() { super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000); } @@ -69,12 +82,85 @@ public long registerTask(long dbId, long tableId, List tempPartitionName return taskId; } + /** + * register insert overwrite task group for auto detect partition. + * it may have many tasks by FrontendService rpc deal. + * all of them will be involved in one txn.(success or fallback) + * + * @return group id, like a transaction id. + */ + public long preRegisterTask() { + long groupId = Env.getCurrentEnv().getNextId(); + taskGroups.put(groupId, new ArrayList()); + taskLocks.put(groupId, new ReentrantLock()); + partitionPairs.put(groupId, Maps.newConcurrentMap()); + return groupId; + } + + /** + * for iot auto detect. register task first. then put in group. + */ + public void registerTaskInGroup(long groupId, long taskId) { + LOG.info("register task " + taskId + " in group " + groupId); + taskGroups.get(groupId).add(taskId); + } + + public List tryReplacePartitionIds(long groupId, List oldPartitionIds) { + Map relations = partitionPairs.get(groupId); + List newIds = new ArrayList(); + for (Long id : oldPartitionIds) { + if (relations.containsKey(id)) { + // if we replaced it. then return new one. + newIds.add(relations.get(id)); + } else { + // otherwise itself. we will deal it soon. + newIds.add(id); + } + } + return newIds; + } + + public void recordPartitionPairs(long groupId, List oldIds, List newIds) { + Map relations = partitionPairs.get(groupId); + Preconditions.checkArgument(oldIds.size() == newIds.size()); + for (int i = 0; i < oldIds.size(); i++) { + relations.put(oldIds.get(i), newIds.get(i)); + } + } + + public ReentrantLock getLock(long groupId) { + return taskLocks.get(groupId); + } + + public void taskGroupFail(long groupId) { + LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed"); + for (Long taskId : taskGroups.get(groupId)) { + taskFail(taskId); + } + cleanTaskGroup(groupId); + } + + public void taskGroupSuccess(long groupId) { + LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed"); + for (Long taskId : taskGroups.get(groupId)) { + taskSuccess(taskId); + } + cleanTaskGroup(groupId); + } + + private void cleanTaskGroup(long groupId) { + partitionPairs.remove(groupId); + taskLocks.remove(groupId); + taskGroups.remove(groupId); + } + /** * when insert overwrite fail, try drop temp partition * * @param taskId */ public void taskFail(long taskId) { + LOG.info("insert overwrite task [" + taskId + "] failed"); boolean rollback = rollback(taskId); if (rollback) { removeTask(taskId); @@ -89,6 +175,7 @@ public void taskFail(long taskId) { * @param taskId */ public void taskSuccess(long taskId) { + LOG.info("insert overwrite task [" + taskId + "] succeed"); removeTask(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index 7d0acf6d47b4de..54f9895ab2c977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -84,7 +83,7 @@ public static void replacePartition(OlapTable olapTable, List partitionN } /** - * generate temp partitionName + * generate temp partitionName. must keep same order. * * @param partitionNames * @return diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index d0292a14201593..0741982c968fc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -138,23 +138,32 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { if (CollectionUtils.isEmpty(partitionNames)) { partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); } - List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - long taskId = Env.getCurrentEnv().getInsertOverwriteManager() - .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + + long taskId = 0; try { if (isAutoDetectOverwrite()) { - // when inserting, BE will call to replace partition by FrontendService, FE do - // the real add&replacement and return replace result. so there's no need to do anything else - insertInto(ctx, executor, tempPartitionNames); + // taskId here is a group id. it contains all replace tasks made and registered in rpc process. + taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask(); + // When inserting, BE will call to replace partition by FrontendService. FE do the real + // add&replacement and return replace result. So there's no need to do anything else. + insertInto(ctx, executor, taskId); + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId); } else { + List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); + taskId = Env.getCurrentEnv().getInsertOverwriteManager() + .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); insertInto(ctx, executor, tempPartitionNames); InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); + Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); } - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); } catch (Exception e) { LOG.warn("insert into overwrite failed"); - Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + if (isAutoDetectOverwrite()) { + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); + } else { + Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + } throw e; } finally { ConnectContext.get().setSkipAuth(false); @@ -162,10 +171,10 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } /** - * insert into select + * insert into select. for sepecified temp partitions * - * @param ctx ctx - * @param executor executor + * @param ctx ctx + * @param executor executor * @param tempPartitionNames tempPartitionNames */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) @@ -213,6 +222,26 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List } } + /** + * insert into auto detect partition. + * + * @param ctx ctx + * @param executor executor + */ + private void insertInto(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception { + UnboundTableSink sink = (UnboundTableSink) logicalQuery; + // 1. for overwrite situation, we disable auto create partition. + // 2. we save and pass overwrite auto detect by insertCtx + OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(false, sink.isAutoDetectPartition(), groupId); + InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(sink, labelName, Optional.of(insertCtx)); + insertCommand.run(ctx, executor); + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); + LOG.warn("InsertInto state error:{}", errMsg); + throw new UserException(errMsg); + } + } + @Override public Plan getExplainPlan(ConnectContext ctx) { return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java index f07390d2a11da3..bebade142d97e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java @@ -21,16 +21,18 @@ * For Olap Table */ public class OlapInsertCommandContext extends InsertCommandContext { - private boolean allowAutoPartition = true; + private boolean allowAutoPartition; private boolean autoDetectOverwrite = false; + private long overwriteGroupId = 0; - public OlapInsertCommandContext() { - + public OlapInsertCommandContext(boolean allowAutoPartition) { + this.allowAutoPartition = allowAutoPartition; } - public OlapInsertCommandContext(boolean allowAutoPartition, boolean autoDetectOverwrite) { + public OlapInsertCommandContext(boolean allowAutoPartition, boolean autoDetectOverwrite, long overwriteGroupId) { this.allowAutoPartition = allowAutoPartition; this.autoDetectOverwrite = autoDetectOverwrite; + this.overwriteGroupId = overwriteGroupId; } public boolean isAllowAutoPartition() { @@ -41,6 +43,10 @@ public boolean isAutoDetectOverwrite() { return autoDetectOverwrite; } + public long getOverwriteGroupId() { + return overwriteGroupId; + } + public void setAllowAutoPartition(boolean allowAutoPartition) { this.allowAutoPartition = allowAutoPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index b0bffc2d4b99f9..270800b6afec19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -113,7 +113,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys OlapTableSink olapTableSink = (OlapTableSink) sink; PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink; OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse( - new OlapInsertCommandContext()); + new OlapInsertCommandContext(true)); boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict() && physicalOlapTableSink.isPartialUpdate() @@ -132,7 +132,10 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys if (!olapInsertCtx.isAllowAutoPartition()) { olapTableSink.setAutoPartition(false); } - olapTableSink.setAutoDetectOverwite(olapInsertCtx.isAutoDetectOverwrite()); + if (olapInsertCtx.isAutoDetectOverwrite()) { + olapTableSink.setAutoDetectOverwite(true); + olapTableSink.setOverwriteGroupId(olapInsertCtx.getOverwriteGroupId()); + } // update // set schema and partition info for tablet id shuffle exchange diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 958186307addc9..aa3c0b0d555e50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -194,6 +194,10 @@ public void setAutoDetectOverwite(boolean var) { tDataSink.getOlapTableSink().getPartition().setEnableAutoDetectOverwrite(var); } + public void setOverwriteGroupId(long var) { + tDataSink.getOlapTableSink().getPartition().setOverwriteGroupId(var); + } + // must called after tupleDescriptor is computed public void complete(Analyzer analyzer) throws UserException { for (Long partitionId : partitionIds) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2b175ad5f5a21c..7a6a6ae7d9bbce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -72,6 +72,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; @@ -201,9 +202,9 @@ import org.apache.doris.thrift.TPrivilegeType; import org.apache.doris.thrift.TQueryStatsResult; import org.apache.doris.thrift.TQueryType; -import org.apache.doris.thrift.TReplicaInfo; import org.apache.doris.thrift.TReplacePartitionRequest; import org.apache.doris.thrift.TReplacePartitionResult; +import org.apache.doris.thrift.TReplicaInfo; import org.apache.doris.thrift.TReportCommitTxnResultRequest; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -270,7 +271,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; // Frontend service used to serve all request for this frontend through // thrift protocol @@ -3560,7 +3563,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request LOG.info("Receive create partition request: {}", request); long dbId = request.getDbId(); long tableId = request.getTableId(); - List partitionIds = request.partition_ids; + List partitionIds = request.getPartitionIds(); + long taskGroupId = request.getOverwriteGroupId(); TReplacePartitionResult result = new TReplacePartitionResult(); TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); if (!Env.getCurrentEnv().isMaster()) { @@ -3596,28 +3600,66 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request } OlapTable olapTable = (OlapTable) table; - List partitionNames = olapTable.getPartitionNamesByIds(partitionIds); - List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); + InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + ReentrantLock taskLock = overwriteManager.getLock(taskGroupId); + List allReqPartNames; // all request partitions try { - InsertOverwriteUtil.addTempPartitions(olapTable, partitionNames, tempPartitionNames); - InsertOverwriteUtil.replacePartition(olapTable, partitionNames, tempPartitionNames); + taskLock.lock(); + // we dont lock the table. other thread in this txn will be controled by taskLock. + // in this txn if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // if not by this txn, just let it fail naturally is ok. + List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); + // here if replacedPartIds still have null. this will throw exception. + allReqPartNames = olapTable.getPartitionNamesByIds(replacedPartIds); + + List pendingPartitionIds = IntStream.range(0, partitionIds.size()) + .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced + .mapToObj(partitionIds::get) + .collect(Collectors.toList()); + // from here we ONLY deal the pending partitions. not include the dealed(by others). + + // below two must have same order inner. + List pendingPartitionNames = olapTable.getPartitionNamesByIds(pendingPartitionIds); + List tempPartitionNames = InsertOverwriteUtil + .generateTempPartitionNames(pendingPartitionNames); + + long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); + overwriteManager.registerTaskInGroup(taskGroupId, taskId); + InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); + InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); + // now temp partitions are bumped up and use new names. we get their ids and record them. + List newPartitionIds = new ArrayList(); + for (String newPartName : pendingPartitionNames) { + newPartitionIds.add(olapTable.getPartition(newPartName).getId()); + } + overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); + if (LOG.isDebugEnabled()) { + LOG.debug("partitoin replacement: "); + for (int i = 0; i < pendingPartitionIds.size(); i++) { + LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + } + } } catch (DdlException ex) { errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); result.setStatus(errorStatus); LOG.warn("send create partition error status: {}", result); return result; + } finally { + taskLock.unlock(); } - // build partition & tablets + // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. + // so they won't be changed again. if other transaction changing it. just let it fail. List partitions = Lists.newArrayList(); List tablets = Lists.newArrayList(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - for (String partitionName : partitionNames) { + for (String partitionName : allReqPartNames) { Partition partition = table.getPartition(partitionName); TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); - int partColNum = partitionInfo.getPartitionColumns().size(); + // set partition keys + int partColNum = partitionInfo.getPartitionColumns().size(); OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum); for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index fba55177edeefe..895feb8b5aa2c4 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -207,6 +207,7 @@ struct TOlapTablePartitionParam { 10: optional Partitions.TPartitionType partition_type // insert overwrite partition(*) 11: optional bool enable_auto_detect_overwrite + 12: optional i64 overwrite_group_id } struct TOlapTableIndex { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f4a9e8f617815c..b1dcb4defd7d5f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1285,7 +1285,7 @@ struct TCreatePartitionResult { // these two for auto detect replacing partition struct TReplacePartitionRequest { - 1: optional i64 txn_id + 1: optional i64 overwrite_group_id 2: optional i64 db_id 3: optional i64 table_id 4: optional list partition_ids // partition to replace. From 5f86b31acdfd7aca4d5c1ac77814cf32e49fb621 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Fri, 15 Mar 2024 07:16:33 +0800 Subject: [PATCH 3/6] bugfix --- be/src/exec/tablet_info.cpp | 20 ++++----- be/src/vec/sink/vrow_distribution.cpp | 2 +- .../doris/service/FrontendServiceImpl.java | 41 ++++++++++--------- 3 files changed, 30 insertions(+), 33 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index c865e9d79ddfe9..bae9d19286cefa 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -629,18 +629,12 @@ Status VOlapTablePartitionParam::replace_partitions( /// just substitute directly. no need to remove and reinsert keys. // range partition - if (t_part.__isset.start_keys) { - part->start_key = std::move(old_part->start_key); - } - if (t_part.__isset.end_keys) { - part->end_key = std::move(old_part->end_key); - } + part->start_key = std::move(old_part->start_key); + part->end_key = std::move(old_part->end_key); // list partition - if (t_part.__isset.in_keys) { - part->in_keys = std::move(old_part->in_keys); - if (t_part.__isset.is_default_partition && t_part.is_default_partition) { - _default_partition = part; - } + part->in_keys = std::move(old_part->in_keys); + if (t_part.__isset.is_default_partition && t_part.is_default_partition) { + _default_partition = part; } part->num_buckets = t_part.num_buckets; @@ -680,9 +674,11 @@ Status VOlapTablePartitionParam::replace_partitions( } // remove old partitions by id std::ranges::sort(old_partition_ids); - for (auto it = _partitions.begin(); it != _partitions.end(); it++) { + for (auto it = _partitions.begin(); it != _partitions.end();) { if (std::ranges::binary_search(old_partition_ids, (*it)->id)) { it = _partitions.erase(it); + } else { + it++; } } diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 053d0ef567b9ac..ae1239aa57ed8c 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -409,7 +409,6 @@ Status VRowDistribution::generate_rows_distribution( } auto num_rows = block->rows(); - _tablet_finder->filter_bitmap().Reset(num_rows); _reset_find_tablets(num_rows); // if there's projection of partition calc, we need to calc it first. @@ -449,6 +448,7 @@ Status VRowDistribution::generate_rows_distribution( // reuse vars for find_tablets void VRowDistribution::_reset_find_tablets(int64_t rows) { + _tablet_finder->filter_bitmap().Reset(rows); _partitions.assign(rows, nullptr); _skip.assign(rows, false); _tablet_indexes.assign(rows, 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7a6a6ae7d9bbce..da477ffe850a11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3617,26 +3617,27 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request .mapToObj(partitionIds::get) .collect(Collectors.toList()); // from here we ONLY deal the pending partitions. not include the dealed(by others). - - // below two must have same order inner. - List pendingPartitionNames = olapTable.getPartitionNamesByIds(pendingPartitionIds); - List tempPartitionNames = InsertOverwriteUtil - .generateTempPartitionNames(pendingPartitionNames); - - long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); - overwriteManager.registerTaskInGroup(taskGroupId, taskId); - InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); - InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and record them. - List newPartitionIds = new ArrayList(); - for (String newPartName : pendingPartitionNames) { - newPartitionIds.add(olapTable.getPartition(newPartName).getId()); - } - overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); - if (LOG.isDebugEnabled()) { - LOG.debug("partitoin replacement: "); - for (int i = 0; i < pendingPartitionIds.size(); i++) { - LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + if (!pendingPartitionIds.isEmpty()) { + // below two must have same order inner. + List pendingPartitionNames = olapTable.getPartitionNamesByIds(pendingPartitionIds); + List tempPartitionNames = InsertOverwriteUtil + .generateTempPartitionNames(pendingPartitionNames); + + long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); + overwriteManager.registerTaskInGroup(taskGroupId, taskId); + InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); + InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); + // now temp partitions are bumped up and use new names. we get their ids and record them. + List newPartitionIds = new ArrayList(); + for (String newPartName : pendingPartitionNames) { + newPartitionIds.add(olapTable.getPartition(newPartName).getId()); + } + overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); + if (LOG.isDebugEnabled()) { + LOG.debug("partition replacement: "); + for (int i = 0; i < pendingPartitionIds.size(); i++) { + LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + } } } } catch (DdlException ex) { From 926a625b5e848b3ef8e3f08d0ef7f99385d99c02 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Mon, 18 Mar 2024 05:40:22 +0800 Subject: [PATCH 4/6] done --- be/src/exec/tablet_info.cpp | 17 +++- be/src/exec/tablet_info.h | 8 +- be/src/vec/sink/vrow_distribution.cpp | 21 ++--- be/src/vec/sink/vrow_distribution.h | 5 +- be/src/vec/sink/vtablet_finder.cpp | 8 -- be/src/vec/sink/vtablet_finder.h | 3 +- be/src/vec/sink/writer/vtablet_writer.cpp | 3 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 13 ++++ .../Manipulation/INSERT-OVERWRITE.md | 70 +++++++++++++++-- .../Manipulation/INSERT-OVERWRITE.md | 69 +++++++++++++++-- .../doris/service/FrontendServiceImpl.java | 2 +- .../insert_overwrite_auto_detect.out | 77 +++++++++++++++++++ .../insert_overwrite_auto_detect.groovy | 75 ++++++++++++++++++ 13 files changed, 334 insertions(+), 37 deletions(-) create mode 100644 regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out create mode 100644 regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index bae9d19286cefa..061cc7b6681286 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -45,6 +45,8 @@ #include "util/string_util.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" +// NOLINTNEXTLINE(unused-includes) +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vliteral.h" #include "vec/runtime/vdatetime_value.h" @@ -397,6 +399,7 @@ Status VOlapTablePartitionParam::_create_partition_keys(const std::vectorid = t_part.id; part_result->is_mutable = t_part.is_mutable; @@ -622,7 +625,18 @@ Status VOlapTablePartitionParam::replace_partitions( // init and add new partitions. insert into _partitions for (int i = 0; i < new_partitions.size(); i++) { const auto& t_part = new_partitions[i]; - auto& old_part = _partitions[i]; + // pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance + VOlapTablePartition* old_part = nullptr; + auto old_part_id = old_partition_ids[i]; + if (auto it = std::find_if( + _partitions.begin(), _partitions.end(), + [=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; }); + it != _partitions.end()) { + old_part = *it; + } else { + return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id); + } + auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part->id = t_part.id; part->is_mutable = t_part.is_mutable; @@ -662,6 +676,7 @@ Status VOlapTablePartitionParam::replace_partitions( // add new partitions with new id. _partitions.emplace_back(part); + // replace items in _partition_maps if (_is_in_partition) { for (auto& in_key : part->in_keys) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index fb982538592946..887f6cf79554d9 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -40,7 +40,6 @@ #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr_fwd.h" namespace doris { @@ -133,6 +132,7 @@ struct VOlapTablePartition { int64_t load_tablet_idx = -1; VOlapTablePartition(vectorized::Block* partition_block) + // the default value of partition bound is -1. : start_key {partition_block, -1}, end_key {partition_block, -1} {} }; @@ -172,6 +172,12 @@ class VOlapTablePartitionParam { VOlapTablePartition*& partition) const { auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) : _partitions_map->upper_bound(std::tuple {block, row, true}); + // enable if need + // LOG(WARNING) << "find row " << row << " of\n" + // << block->dump_data() << "in:\n" + // << _partition_block.dump_data() + // << "result line row: " << std::get<1>(it->first); + // for list partition it might result in default partition if (_is_in_partition) { partition = (it != _partitions_map->end()) ? it->second : _default_partition; diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index ae1239aa57ed8c..6880af08dc61e2 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include @@ -130,17 +129,21 @@ Status VRowDistribution::_replace_overwriting_partition() { request.__set_db_id(_vpartition->db_id()); request.__set_table_id(_vpartition->table_id()); - std::vector request_part_ids; - request_part_ids.reserve(_partitions.size()); + // only request for partitions not recorded for replacement + std::set id_deduper; for (const auto& part : _partitions) { - if (!_new_partition_ids.contains(part->id)) { - request_part_ids.push_back(part->id); - } // otherwise means replaced already. + if (_new_partition_ids.contains(part->id)) { + // this is a new partition. dont replace again. + } else { + // request for replacement + id_deduper.insert(part->id); + } + } + if (id_deduper.empty()) { + return Status::OK(); // no need to request } // de-duplicate. there's no check in FE - auto deduper = std::set(request_part_ids.begin(), request_part_ids.end()); - request_part_ids.assign(deduper.begin(), deduper.end()); - + std::vector request_part_ids(id_deduper.begin(), id_deduper.end()); request.__set_partition_ids(request_part_ids); diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index f4e1a1dfdb2e8e..19a6538cc12f20 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -25,17 +25,14 @@ #include #include -#include #include #include "common/status.h" #include "exec/tablet_info.h" #include "runtime/runtime_state.h" -#include "runtime/types.h" #include "util/runtime_profile.h" -#include "util/stopwatch.hpp" #include "vec/core/block.h" -#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 865a3066d62f8c..2e0d278fa4fe01 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -22,22 +22,14 @@ #include #include -#include #include -#include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "exec/tablet_info.h" -#include "exprs/runtime_filter.h" -#include "gutil/integral_types.h" -#include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" -#include "vec/core/columns_with_type_and_name.h" -#include "vec/data_types/data_type.h" -#include "vec/functions/simple_function_factory.h" namespace doris::vectorized { Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 537e38d86c3d34..24f8e357e28976 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -17,8 +17,8 @@ #pragma once +#include #include -#include #include "common/status.h" #include "exec/tablet_info.h" @@ -53,6 +53,7 @@ class OlapTabletFinder { bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } + // all partitions for multi find-processes of its relative writer. const vectorized::flat_hash_set& partition_ids() { return _partition_ids; } int64_t num_filtered_rows() const { return _num_filtered_rows; } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index c06303e6a9ed88..404ca882be3db5 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -677,9 +677,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { request->add_partition_ids(pid); } - request->set_write_single_replica(false); + request->set_write_single_replica(_parent->_write_single_replica); if (_parent->_write_single_replica) { - request->set_write_single_replica(true); for (auto& _slave_tablet_node : _slave_tablet_nodes) { PSlaveTabletNodes slave_tablet_nodes; for (auto node_id : _slave_tablet_node.second) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 6e81344a1c2741..1e5b0e2d0cfbc9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -632,11 +633,23 @@ Status VTabletWriterV2::close(Status exec_status) { Status VTabletWriterV2::_close_load(const Streams& streams) { auto node_id = streams[0]->dst_id(); std::vector tablets_to_commit; + std::vector partition_ids; for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { + if (VLOG_DEBUG_IS_ON) { + partition_ids.push_back(tablet.partition_id()); + } tablets_to_commit.push_back(tablet); } } + if (VLOG_DEBUG_IS_ON) { + std::string msg("close load partitions: "); + msg.reserve(partition_ids.size() * 7); + for (auto v : partition_ids) { + msg.append(std::to_string(v) + ", "); + } + LOG(WARNING) << msg; + } for (const auto& stream : streams) { RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); } diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md index f94e37f5e01ca2..7293785ba536ef 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md @@ -33,11 +33,11 @@ INSERT OVERWRITE ### Description -The function of this statement is to overwrite a table or a partition of a table +The function of this statement is to overwrite a table or some partitions of a table ```sql INSERT OVERWRITE table table_name - [ PARTITION (p1, ...) ] + [ PARTITION (p1, ... | *) ] [ WITH LABEL label] [ (column [, ...]) ] [ [ hint [, ...] ] ] @@ -48,7 +48,10 @@ INSERT OVERWRITE table table_name > table_name: the destination table to overwrite. This table must exist. It can be of the form `db_name.table_name` > -> partitions: the table partition that needs to be overwritten must be one of the existing partitions in `table_name` separated by a comma +> partitions: the table partitions that needs to be overwritten. The following two formats are supported +> +>> 1. partition names. must be one of the existing partitions in `table_name` separated by a comma +>> 2. asterisk(*)。Enable [auto-detect-partition](#overwrite-auto-detect-partition). The write operation will automatically detect the partitions involved in the data and overwrite those partitions. > > label: specify a label for the Insert task > @@ -69,7 +72,7 @@ INSERT OVERWRITE table table_name Notice: 1. In the current version, the session variable `enable_insert_strict` is set to `true` by default. If some data that does not conform to the format of the target table is filtered out during the execution of the `INSERT OVERWRITE` statement, such as when overwriting a partition and not all partition conditions are satisfied, overwriting the target table will fail. -2. If the target table of the INSERT OVERWRITE is an [AUTO-PARTITION-table](../../../../advanced/partition/auto-partition), then new partitions can be created if PARTITION is not specified (that is, rewrite the whole table). If PARTITION for overwrite is specified, then the AUTO PARTITION table behaves as if it were a normal partitioned table during this process, and data that does not satisfy the existing partition conditions is filtered instead of creating a new partition. +2. If the target table of the INSERT OVERWRITE is an [AUTO-PARTITION-table](../../../../advanced/partition/auto-partition), then new partitions can be created if PARTITION is not specified (that is, rewrite the whole table). If PARTITION for overwrite is specified(Includes automatic detection and overwriting of partitions through the `partition(*)` syntax), then the AUTO PARTITION table behaves as if it were a normal partitioned table during this process, and data that does not satisfy the existing partition conditions is filtered instead of creating a new partition. 3. The `INSERT OVERWRITE` statement first creates a new table, inserts the data to be overwritten into the new table, and then atomically replaces the old table with the new table and modifies its name. Therefore, during the process of overwriting the table, the data in the old table can still be accessed normally until the overwriting is completed. ### Example @@ -138,6 +141,13 @@ PROPERTIES ( #### Overwrite Table Partition +When using INSERT OVERWRITE to rewrite partitions, we actually encapsulate the following three steps into a single transaction and execute it. If it fails halfway through, the operations that have been performed will be rolled back: +1. Assuming that partition `p1` is specified to be rewritten, first create an empty temporary partition `pTMP` with the same structure as the target partition to be rewritten. +2. Write data to `pTMP`. +3. replace `p1` with the `pTMP` atom + +The following is examples: + 1. Overwrite partitions `P1` and `P2` of the `test` table using the form of `VALUES`. ```sql @@ -175,6 +185,56 @@ PROPERTIES ( INSERT OVERWRITE table test PARTITION(p1,p2) WITH LABEL `label4` (c1, c2) SELECT * from test2; ``` + +#### Overwrite Auto Detect Partition + +When the PARTITION clause specified by the INSERT OVERWRITE command is `PARTITION(*)`, this overwrite will automatically detect the partition where the data is located. Example: + +```sql +mysql> create table test( + -> k0 int null + -> ) + -> partition by range (k0) + -> ( + -> PARTITION p10 values less than (10), + -> PARTITION p100 values less than (100), + -> PARTITION pMAX values less than (maxvalue) + -> ) + -> DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + -> properties("replication_num" = "1"); +Query OK, 0 rows affected (0.11 sec) + +mysql> insert into test values (1), (2), (15), (100), (200); +Query OK, 5 rows affected (0.29 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 1 | +| 2 | +| 15 | +| 100 | +| 200 | ++------+ +5 rows in set (0.23 sec) + +mysql> insert overwrite table test partition(*) values (3), (1234); +Query OK, 2 rows affected (0.24 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 3 | +| 15 | +| 1234 | ++------+ +3 rows in set (0.20 sec) +``` + +As you can see, all data in partitions `p10` and `pMAX`, where data 3 and 1234 are located, are overwritten, while partition `p100` remains unchanged. This operation can be interpreted as syntactic sugar for specifying a specific partition to be overwritten by the PARTITION clause during an INSERT OVERWRITE operation, which is implemented in the same way as [specify a partition to overwrite](#overwrite-table-partition). The `PARTITION(*)` syntax eliminates the need to manually fill in all the partition names when overwriting a large number of partitions. + ### Keywords - INSERT OVERWRITE, OVERWRITE + INSERT OVERWRITE, OVERWRITE, AUTO DETECT diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md index c506029155b19e..587a981c24b1a1 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md @@ -33,11 +33,11 @@ INSERT OVERWRITE ### Description -该语句的功能是重写表或表的某个分区 +该语句的功能是重写表或表的某些分区 ```sql INSERT OVERWRITE table table_name - [ PARTITION (p1, ...) ] + [ PARTITION (p1, ... | *) ] [ WITH LABEL label] [ (column [, ...]) ] [ [ hint [, ...] ] ] @@ -48,7 +48,10 @@ INSERT OVERWRITE table table_name > table_name: 需要重写的目的表。这个表必须存在。可以是 `db_name.table_name` 形式 > -> partitions: 需要重写的表分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔 +> partitions: 需要重写的目标分区,支持两种形式: +> +>> 1. 分区名。必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔。 +>> 2. 星号(*)。开启[自动检测分区](#overwrite-auto-detect-partition)功能。写入操作将会自动检测数据所涉及的分区,并覆写这些分区。 > > label: 为 Insert 任务指定一个 label > @@ -69,7 +72,7 @@ INSERT OVERWRITE table table_name 注意: 1. 在当前版本中,会话变量 `enable_insert_strict` 默认为 `true`,如果执行 `INSERT OVERWRITE` 语句时,对于有不符合目标表格式的数据被过滤掉的话会重写目标表失败(比如重写分区时,不满足所有分区条件的数据会被过滤)。 -2. 如果INSERT OVERWRITE的目标表是[AUTO-PARTITION表](../../../../advanced/partition/auto-partition),若未指定PARTITION(重写整表),那么可以创建新的分区。如果指定了覆写的PARTITION,那么在此过程中,AUTO PARTITION表表现得如同普通分区表一样,不满足现有分区条件的数据将被过滤,而非创建新的分区。 +2. 如果INSERT OVERWRITE的目标表是[AUTO-PARTITION表](../../../../advanced/partition/auto-partition),若未指定PARTITION(重写整表),那么可以创建新的分区。如果指定了覆写的PARTITION(包括通过 `partition(*)` 语法自动检测并覆盖分区),那么在此过程中,AUTO PARTITION表表现得如同普通分区表一样,不满足现有分区条件的数据将被过滤,而非创建新的分区。 3. INSERT OVERWRITE语句会首先创建一个新表,将需要重写的数据插入到新表中,最后原子性的用新表替换旧表并修改名称。因此,在重写表的过程中,旧表中的数据在重写完毕之前仍然可以正常访问。 ### Example @@ -139,6 +142,13 @@ PROPERTIES ( #### Overwrite Table Partition +使用 INSERT OVERWRITE 重写分区时,实际我们是将如下三步操作封装为一个事务并执行,如果中途失败,已进行的操作将会回滚: +1. 假设指定重写分区 p1,首先创建一个与重写的目标分区结构相同的空临时分区 `pTMP` +2. 向 `pTMP` 中写入数据 +3. 使用 `pTMP` 原子替换 `p1` 分区 + +举例如下: + 1. VALUES的形式重写`test`表分区`P1`和`p2` ```sql @@ -176,7 +186,56 @@ PROPERTIES ( INSERT OVERWRITE table test PARTITION(p1,p2) WITH LABEL `label4` (c1, c2) SELECT * from test2; ``` +#### Overwrite Auto Detect Partition + +当 INSERT OVERWRITE 命令指定的 PARTITION 子句为 `PARTITION(*)` 时,此次覆写将会自动检测分区数据所在的分区。例如: + +```sql +mysql> create table test( + -> k0 int null + -> ) + -> partition by range (k0) + -> ( + -> PARTITION p10 values less than (10), + -> PARTITION p100 values less than (100), + -> PARTITION pMAX values less than (maxvalue) + -> ) + -> DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + -> properties("replication_num" = "1"); +Query OK, 0 rows affected (0.11 sec) + +mysql> insert into test values (1), (2), (15), (100), (200); +Query OK, 5 rows affected (0.29 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 1 | +| 2 | +| 15 | +| 100 | +| 200 | ++------+ +5 rows in set (0.23 sec) + +mysql> insert overwrite table test partition(*) values (3), (1234); +Query OK, 2 rows affected (0.24 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 3 | +| 15 | +| 1234 | ++------+ +3 rows in set (0.20 sec) +``` + +可以看到,数据 3、1234 所在的分区 `p10` 和 `pMAX` 中的全部数据均被覆写,而 `p100` 分区未发生变化。该操作可以理解为 INSERT OVERWRITE 操作时通过 PARTITION 子句指定覆写特定分区的语法糖,它的实现原理与[指定重写特定分区](#overwrite-table-partition)相同。通过 `PARTITION(*)` 的语法,在覆写大量分区数据时我们可以免于手动填写全部分区名的繁琐。 + ### Keywords - INSERT OVERWRITE, OVERWRITE + INSERT OVERWRITE, OVERWRITE, AUTO DETECT diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index da477ffe850a11..2e8aed48b5ec57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3606,7 +3606,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request try { taskLock.lock(); // we dont lock the table. other thread in this txn will be controled by taskLock. - // in this txn if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // if we have already replaced. dont do it again, but acquire the recorded new partition directly. // if not by this txn, just let it fail naturally is ok. List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. diff --git a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out new file mode 100644 index 00000000000000..f5f6e4caa52d24 --- /dev/null +++ b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out @@ -0,0 +1,77 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 +2 +15 +100 +200 + +-- !sql -- +3 +15 +1234 + +-- !sql -- +1 +2 +15 +333 +444 +555 + +-- !sql -- +-100 +-100 +15 +333 +444 +555 + +-- !sql -- +-100 +-100 +15 +333 +444 +555 + +-- !sql -- +1234567 +Beijing +Shanghai +list +xxx + +-- !sql -- +1234567 +BEIJING +Shanghai +list +xxx + +-- !sql -- +7654321 +7654321 +7654321 +BEIJING +Shanghai +list +xxx + +-- !sql -- +7654321 +BEIJING +LIST +LIST +Shanghai +list +list +xxx + +-- !sql -- +7654321 +BEIJING +LIST +SHANGHAI +XXX + diff --git a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy new file mode 100644 index 00000000000000..72001f35e3997f --- /dev/null +++ b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iot_auto_detect") { + sql """set enable_nereids_planner = true""" + sql """set enable_fallback_to_original_planner = false""" + sql """set enable_nereids_dml = true""" + + sql " drop table if exists range1; " + sql """ + create table range1( + k0 int null + ) + partition by range (k0) + ( + PARTITION p10 values less than (10), + PARTITION p100 values less than (100), + PARTITION pMAX values less than (maxvalue) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql " insert into range1 values (1), (2), (15), (100), (200); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (3), (1234); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (1), (2), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + sql " insert into range1 values (-12345), (12345); " + sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + + sql " drop table if exists list1; " + sql """ + create table list1( + k0 varchar null + ) + partition by list (k0) + ( + PARTITION p1 values in (("Beijing"), ("BEIJING")), + PARTITION p2 values in (("Shanghai"), ("SHANGHAI")), + PARTITION p3 values in (("xxx"), ("XXX")), + PARTITION p4 values in (("list"), ("LIST")), + PARTITION p5 values in (("1234567"), ("7654321")) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into list1 values ("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("BEIJING"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("7654321"), ("7654321"), ("7654321"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("7654321"), ("list"), ("list"), ("LIST"), ("LIST"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("SHANGHAI"), ("XXX"), ("LIST"), ("7654321"); """ + qt_sql " select * from list1 order by k0; " +} From 3ba93a1d1a4837258a7b4959c5d0472a42f7a1a8 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Mon, 18 Mar 2024 13:00:02 +0800 Subject: [PATCH 5/6] test --- be/src/exec/tablet_info.h | 9 +++-- be/src/vec/sink/vrow_distribution.cpp | 6 +++- .../nereids/analyzer/UnboundTableSink.java | 7 ++-- .../analyzer/UnboundTableSinkCreator.java | 3 +- .../nereids/parser/LogicalPlanBuilder.java | 3 +- .../insert_overwrite_auto_detect.out | 6 ++++ .../insert_overwrite_auto_detect.groovy | 33 +++++++++++++++++++ 7 files changed, 55 insertions(+), 12 deletions(-) diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 887f6cf79554d9..9c3a1b6db44073 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -32,6 +32,7 @@ #include #include +#include "common/logging.h" #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" @@ -172,11 +173,9 @@ class VOlapTablePartitionParam { VOlapTablePartition*& partition) const { auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) : _partitions_map->upper_bound(std::tuple {block, row, true}); - // enable if need - // LOG(WARNING) << "find row " << row << " of\n" - // << block->dump_data() << "in:\n" - // << _partition_block.dump_data() - // << "result line row: " << std::get<1>(it->first); + VLOG_TRACE << "find row " << row << " of\n" + << block->dump_data() << "in:\n" + << _partition_block.dump_data() << "result line row: " << std::get<1>(it->first); // for list partition it might result in default partition if (_is_in_partition) { diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 6880af08dc61e2..d98131613a12f9 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -131,7 +131,11 @@ Status VRowDistribution::_replace_overwriting_partition() { // only request for partitions not recorded for replacement std::set id_deduper; - for (const auto& part : _partitions) { + for (const auto* part : _partitions) { + if (part == nullptr) [[unlikely]] { + return Status::EndOfFile( + "Cannot found origin partitions in auto detect overwriting, stop processing"); + } if (_new_partition_ids.contains(part->id)) { // this is a new partition. dont replace again. } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 7e0a6899c93638..8d8928fea6dd2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -81,14 +81,15 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, - boolean isAutoDetectPartition, boolean isPartialUpdate, DMLCommandType dmlCommandType, + boolean temporaryPartition, List partitions, boolean isAutoDetectPartition, + boolean isPartialUpdate, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); - this.temporaryPartition = false; - this.partitions = Lists.newArrayList(); + this.temporaryPartition = temporaryPartition; + this.partitions = Utils.copyRequiredList(partitions); this.autoDetectPartition = isAutoDetectPartition; this.isPartialUpdate = isPartialUpdate; this.dmlCommandType = dmlCommandType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 16855a25f1fb5a..58a172a98fa028 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; +import org.checkerframework.com.google.common.collect.Lists; import java.util.List; import java.util.Optional; @@ -81,7 +82,7 @@ public static LogicalSink createUnboundTableSink(List na String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); if (curCatalog instanceof InternalCatalog) { - return new UnboundTableSink<>(nameParts, colNames, hints, false, + return new UnboundTableSink<>(nameParts, colNames, hints, false, Lists.newArrayList(), true, isPartialUpdate, dmlCommandType, Optional.empty(), Optional.empty(), plan); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 18644d45de3f1c..35fb23ab51dd55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -531,7 +531,7 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { // partitionSpec may be NULL. means auto detect partition. only available when // IOT Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); - UnboundTableSink sink; + LogicalSink sink; if (partitionSpec.second == null) { // auto detect partition if (!isOverwrite) { throw new ParseException("Only support wildcard in overwrite partition", ctx); @@ -540,7 +540,6 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { tableName.build(), colNames, ImmutableList.of(), - true, ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), DMLCommandType.INSERT, plan); diff --git a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out index f5f6e4caa52d24..3cde86880473dc 100644 --- a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out +++ b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out @@ -75,3 +75,9 @@ LIST SHANGHAI XXX +-- !sql -- +2008-01-01 +2008-02-02 +2013-02-02 +2022-03-03 + diff --git a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy index 72001f35e3997f..367aaa9d536ec5 100644 --- a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy +++ b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy @@ -72,4 +72,37 @@ suite("test_iot_auto_detect") { qt_sql " select * from list1 order by k0; " sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("SHANGHAI"), ("XXX"), ("LIST"), ("7654321"); """ qt_sql " select * from list1 order by k0; " + try { + sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("invalid"); """ + } catch (Exception e) { + log.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has filtered data in strict mode') || + e.getMessage().contains('Cannot found origin partitions in auto detect overwriting')) + } + + sql " drop table if exists dt; " + sql """ + create table dt( + k0 date null + ) + partition by range (k0) + ( + PARTITION p10 values less than ("2010-01-01"), + PARTITION p100 values less than ("2020-01-01"), + PARTITION pMAX values less than ("2030-01-01") + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into dt values ("2005-01-01"), ("2013-02-02"), ("2022-03-03"); """ + sql """ insert overwrite table dt partition(*) values ("2008-01-01"), ("2008-02-02"); """ + qt_sql " select * from dt order by k0; " + try { + sql """ insert overwrite table dt partition(*) values ("2023-02-02"), ("3000-12-12"); """ + } catch (Exception e) { + log.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has filtered data in strict mode') || + e.getMessage().contains('Cannot found origin partitions in auto detect overwriting')) + } + } From 443c9f24ce4cedc8f8b12b79004f64939af74254 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Mon, 18 Mar 2024 20:08:21 +0800 Subject: [PATCH 6/6] fix --- .../apache/doris/analysis/AnalyzeTblStmt.java | 8 +++- .../analysis/InsertOverwriteTableStmt.java | 4 +- .../doris/analysis/NativeInsertStmt.java | 6 +-- .../apache/doris/analysis/PartitionNames.java | 24 +++++++----- .../org/apache/doris/catalog/OlapTable.java | 10 +++-- .../nereids/analyzer/UnboundTableSink.java | 5 +-- .../analyzer/UnboundTableSinkCreator.java | 35 +++++++++++++---- .../nereids/parser/LogicalPlanBuilder.java | 38 +++++++------------ .../org/apache/doris/qe/StmtExecutor.java | 10 ++--- .../doris/service/FrontendServiceImpl.java | 4 +- .../doris/statistics/AnalysisManager.java | 2 +- 11 files changed, 80 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index 52967b01c83890..08efb31d6631fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -255,11 +255,15 @@ public Set getPartitionNames() { return partitions; } - public boolean isAllPartitions() { + /** + * @return for OLAP table, only in overwrite situation, overwrite auto detect partition + * for External table, all partitions. + */ + public boolean isStarPartition() { if (partitionNames == null) { return false; } - return partitionNames.isAutoReplace(); + return partitionNames.isStar(); } public long getPartitionCount() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java index e0c4619fcfc105..3b4e651cc02071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java @@ -76,8 +76,8 @@ public List getPartitionNames() { /* * auto detect which partitions to replace. enable by partition(*) grammer */ - public boolean isAutoReplace() { - return target.getPartitionNames().isAutoReplace(); + public boolean isAutoDetectPartition() { + return target.getPartitionNames().isStar(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index af4e0b8c3c412a..1c4b13aa235c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -163,7 +163,7 @@ public class NativeInsertStmt extends InsertStmt { boolean hasEmptyTargetColumns = false; private boolean allowAutoPartition = true; - private boolean autoReplacePartition = false; + private boolean withAutoDetectOverwrite = false; enum InsertType { NATIVE_INSERT("insert_"), @@ -318,8 +318,8 @@ public boolean isTransactionBegin() { return isTransactionBegin; } - public NativeInsertStmt withAutoReplaceEnabled() { - this.autoReplacePartition = true; + public NativeInsertStmt withAutoDetectOverwrite() { + this.withAutoDetectOverwrite = true; return this; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java index 0a2928df1a8265..f82f497176bec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java @@ -48,7 +48,7 @@ public class PartitionNames implements ParseNode, Writable { // true if these partitions are temp partitions @SerializedName(value = "isTemp") private final boolean isTemp; - private final boolean autoReplace; + private final boolean isStar; private final long count; // Default partition count to collect statistic for external table. private static final long DEFAULT_PARTITION_COUNT = 100; @@ -56,28 +56,28 @@ public class PartitionNames implements ParseNode, Writable { public PartitionNames(boolean isTemp, List partitionNames) { this.partitionNames = partitionNames; this.isTemp = isTemp; - this.autoReplace = false; + this.isStar = false; this.count = 0; } public PartitionNames(PartitionNames other) { this.partitionNames = Lists.newArrayList(other.partitionNames); this.isTemp = other.isTemp; - this.autoReplace = other.autoReplace; + this.isStar = other.isStar; this.count = 0; } - public PartitionNames(boolean autoReplace) { + public PartitionNames(boolean isStar) { this.partitionNames = null; this.isTemp = false; - this.autoReplace = autoReplace; + this.isStar = isStar; this.count = 0; } public PartitionNames(long partitionCount) { this.partitionNames = null; this.isTemp = false; - this.autoReplace = false; + this.isStar = false; this.count = partitionCount; } @@ -89,8 +89,12 @@ public boolean isTemp() { return isTemp; } - public boolean isAutoReplace() { - return autoReplace; + /** + * @return for OLAP table, only in overwrite situation, overwrite auto detect partition + * for External table, all partitions. + */ + public boolean isStar() { + return isStar; } public long getCount() { @@ -99,10 +103,10 @@ public long getCount() { @Override public void analyze(Analyzer analyzer) throws AnalysisException { - if (autoReplace && count > 0) { + if (isStar && count > 0) { throw new AnalysisException("All partition and partition count couldn't be set at the same time."); } - if (autoReplace || count > 0) { + if (isStar || count > 0) { return; } if (partitionNames == null || partitionNames.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 51c8f7b3f1bf1e..b6239f486cdeec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1106,10 +1106,12 @@ public Set getPartitionNames() { return Sets.newHashSet(nameToPartition.keySet()); } - public List getPartitionNamesByIds(List partitionIds) { - return partitionIds.stream().map(id -> { - return idToPartition.get(id).getName(); - }).collect(Collectors.toList()); + public List uncheckedGetPartNamesById(List partitionIds) { + List names = new ArrayList(); + for (Long id : partitionIds) { + names.add(idToPartition.get(id).getName()); + } + return names; } public List getPartitionIds() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 8d8928fea6dd2c..8ecd417691de02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -33,11 +33,9 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; -import com.clearspring.analytics.util.Lists; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -52,7 +50,7 @@ public class UnboundTableSink extends UnboundLogicalSin private final List partitions; private final boolean isPartialUpdate; private final DMLCommandType dmlCommandType; - private boolean autoDetectPartition = false; + private final boolean autoDetectPartition; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { @@ -73,6 +71,7 @@ public UnboundTableSink(List nameParts, List colNames, List createUnboundTableSink(List na } /** - * create unbound sink for DML plan with auto detect overwrite partition enable + * create unbound sink for DML plan with auto detect overwrite partition enable. */ - public static LogicalSink createUnboundTableSink(List nameParts, - List colNames, List hints, - boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + public static LogicalSink createUnboundTableSinkMaybeOverwrite(List nameParts, + List colNames, List hints, boolean temporaryPartition, List partitions, + boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType, + LogicalPlan plan) { + if (isAutoDetectPartition) { // partitions is null + if (!isOverwrite) { + throw new ParseException("ASTERISK is only supported in overwrite partition for OLAP table"); + } + temporaryPartition = false; + partitions = ImmutableList.of(); + } + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); if (curCatalog instanceof InternalCatalog) { - return new UnboundTableSink<>(nameParts, colNames, hints, false, Lists.newArrayList(), true, + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + isAutoDetectPartition, isPartialUpdate, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } - throw new RuntimeException( - "Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + throw new AnalysisException( + "Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported." + + (isAutoDetectPartition + ? " PARTITION(*) is only supported in overwrite partition for OLAP table" + : "")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 35fb23ab51dd55..2985664f8b818a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -528,32 +528,20 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { List colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols); // TODO visit partitionSpecCtx LogicalPlan plan = visitQuery(ctx.query()); - // partitionSpec may be NULL. means auto detect partition. only available when - // IOT + // partitionSpec may be NULL. means auto detect partition. only available when IOT Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); - LogicalSink sink; - if (partitionSpec.second == null) { // auto detect partition - if (!isOverwrite) { - throw new ParseException("Only support wildcard in overwrite partition", ctx); - } - sink = UnboundTableSinkCreator.createUnboundTableSink( - tableName.build(), - colNames, - ImmutableList.of(), - ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), - DMLCommandType.INSERT, - plan); - } else { // normal partition - sink = UnboundTableSinkCreator.createUnboundTableSink( - tableName.build(), - colNames, - ImmutableList.of(), - partitionSpec.first, - partitionSpec.second, - ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), - DMLCommandType.INSERT, - plan); - } + boolean isAutoDetect = partitionSpec.second == null; + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite( + tableName.build(), + colNames, + ImmutableList.of(), // hints + partitionSpec.first, // isTemp + partitionSpec.second, // partition names + isAutoDetect, + isOverwrite, + ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), + DMLCommandType.INSERT, + plan); LogicalPlan command; if (isOverwrite) { command = new InsertOverwriteTableCommand(sink, labelName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 0b03a49a5cf939..820513132c5f3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2794,7 +2794,7 @@ private void handleIotStmt() { ConnectContext.get().setSkipAuth(true); try { InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt; - if (iotStmt.isAutoReplace()) { + if (iotStmt.isAutoDetectPartition()) { // insert overwrite table auto detect which partitions need to replace handleAutoOverwritePartition(iotStmt); } else if (iotStmt.getPartitionNames().size() == 0) { @@ -2947,16 +2947,14 @@ private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) { } /* - * we use a anti-AutoPartition-like function to find partitions to replace. + * TODO: support insert overwrite auto detect partition in legacy planner */ private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) { - // register query in replaceManager - - // + // TODO: TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl()); try { parsedStmt = new NativeInsertStmt(targetTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()), - iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true).withAutoReplaceEnabled(); + iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true).withAutoDetectOverwrite(); parsedStmt.setUserInfo(context.getCurrentUserIdentity()); execute(); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2e8aed48b5ec57..dc058156b603a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3610,7 +3610,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request // if not by this txn, just let it fail naturally is ok. List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. - allReqPartNames = olapTable.getPartitionNamesByIds(replacedPartIds); + allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds); List pendingPartitionIds = IntStream.range(0, partitionIds.size()) .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced @@ -3619,7 +3619,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request // from here we ONLY deal the pending partitions. not include the dealed(by others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. - List pendingPartitionNames = olapTable.getPartitionNamesByIds(pendingPartitionIds); + List pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); List tempPartitionNames = InsertOverwriteUtil .generateTempPartitionNames(pendingPartitionNames); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index f90ce83a088185..258c33305afc4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -348,7 +348,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio Set partitionNames = stmt.getPartitionNames(); boolean partitionOnly = stmt.isPartitionOnly(); boolean isSamplingPartition = stmt.isSamplingPartition(); - boolean isAllPartition = stmt.isAllPartitions(); + boolean isAllPartition = stmt.isStarPartition(); long partitionCount = stmt.getPartitionCount(); int samplePercent = stmt.getSamplePercent(); int sampleRows = stmt.getSampleRows();