From eeb05d5e46bb10a9d5180386dc9b005b0b9ef167 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 25 Jun 2025 15:04:00 +0800 Subject: [PATCH 1/2] [opt](partial update) use a separate config to control the behavior of newly inserted rows in partial update (#41232) Currently, Doris use strict mode to decide if newly inserted rows should be appended or report an error in partial update, which is hard to use. This PR add a new session variable and load property `partial_update_new_key_behavior` to control the behavior of newly inserted rows in partial update. `partial_update_new_key_behavior` has ~three~ two options: - `APPEND`: append the newly inserted rows - ~`IGNORE`: delete the newly inserted rows silently(will not be taken into filtered rows)~ - `ERROR`: report error if meet newly inserted rows, and the error msg will contains one row's keys which is not in table. --- The reason for not supporting `IGNORE` mode: To support `IGNORE` mode, we need to add delete sign for newly inserted rows in partial update to delete them rather than use delete bitmap mark to delete them because compaction will not use delete bitmap when reading data. Also, we need to record the rows whose delete sign is added by us in this situation for resolving conflicts in publish phase to avoid wrongly delete the rows if there are another concurrent load insert some of these rows successfully. This increases code complexity and is error-prone. Doc: https://github.com/apache/doris-website/pull/2472 --- be/src/common/status.h | 1 + be/src/exec/tablet_info.cpp | 27 ++ be/src/exec/tablet_info.h | 5 + be/src/http/action/stream_load.cpp | 20 +- be/src/http/http_common.h | 1 + be/src/olap/delta_writer_v2.cpp | 1 + be/src/olap/partial_update_info.cpp | 53 ++-- be/src/olap/partial_update_info.h | 12 +- .../olap/rowset/segment_v2/segment_writer.cpp | 22 +- .../segment_v2/vertical_segment_writer.cpp | 29 +- be/src/olap/rowset_builder.cpp | 1 + .../org/apache/doris/analysis/LoadStmt.java | 19 ++ .../doris/cloud/load/CloudBrokerLoadJob.java | 3 +- .../cloud/load/CloudLoadLoadingTask.java | 7 +- .../doris/load/loadv2/BrokerLoadJob.java | 3 +- .../org/apache/doris/load/loadv2/LoadJob.java | 6 + .../doris/load/loadv2/LoadLoadingTask.java | 9 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 8 +- .../nereids/analyzer/UnboundTableSink.java | 33 ++- .../analyzer/UnboundTableSinkCreator.java | 11 +- .../translator/PhysicalPlanTranslator.java | 3 + .../nereids/parser/LogicalPlanBuilder.java | 1 + .../nereids/rules/analysis/BindSink.java | 3 + ...lOlapTableSinkToPhysicalOlapTableSink.java | 1 + .../plans/commands/DeleteFromCommand.java | 4 +- .../trees/plans/commands/LoadCommand.java | 4 +- .../trees/plans/commands/UpdateCommand.java | 4 +- .../insert/InsertOverwriteTableCommand.java | 4 + .../plans/logical/LogicalOlapTableSink.java | 32 ++- .../plans/physical/PhysicalOlapTableSink.java | 37 ++- .../apache/doris/planner/OlapTableSink.java | 95 ++++++- .../doris/planner/StreamLoadPlanner.java | 3 +- .../org/apache/doris/qe/SessionVariable.java | 36 +++ .../org/apache/doris/task/LoadTaskInfo.java | 5 + .../org/apache/doris/task/StreamLoadTask.java | 10 + .../rules/rewrite/EliminateSortTest.java | 5 +- gensrc/proto/descriptors.proto | 3 + gensrc/proto/olap_file.proto | 7 + gensrc/thrift/Descriptors.thrift | 8 + gensrc/thrift/FrontendService.thrift | 2 + .../import-way/error-data-handling.md.out | 5 - .../partial_update/row_policy1.csv | 4 + .../partial_update/row_policy2.csv | 4 + .../partial_update/row_policy3.csv | 4 + .../test_partial_update_new_key_policy.out | 153 ++++++++++ .../test_partial_update_new_row_policy.out | 78 +++++ .../test_partial_update_strict_mode.out | 16 +- .../import-way/error-data-handling.md.groovy | 59 ---- .../insert_into_table/partial_update.groovy | 5 +- ...l_update_insert_light_schema_change.groovy | 8 +- ...partial_update_insert_schema_change.groovy | 8 +- ...t_partial_update_native_insert_stmt.groovy | 6 +- .../test_partial_update_new_key_policy.groovy | 267 ++++++++++++++++++ .../test_partial_update_only_keys.groovy | 7 +- .../test_partial_update_schema_change.groovy | 12 +- ...tial_update_schema_change_row_store.groovy | 14 +- .../test_partial_update_strict_mode.groovy | 18 +- .../test_partial_update_upsert.groovy | 5 +- 58 files changed, 1002 insertions(+), 209 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy diff --git a/be/src/common/status.h b/be/src/common/status.h index 26877e88dc79f9..68035e91e74835 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -293,6 +293,7 @@ namespace ErrorCode { E(KEY_NOT_FOUND, -7000, false); \ E(KEY_ALREADY_EXISTS, -7001, false); \ E(ENTRY_NOT_FOUND, -7002, false); \ + E(NEW_ROWS_IN_PARTIAL_UPDATE, -7003, false); \ E(INVALID_TABLET_STATE, -7211, false); \ E(ROWSETS_EXPIRED, -7311, false); \ E(CGROUP_ERROR, -7411, false); diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 87c0a824951446..97a08028bb3c83 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -130,6 +130,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { } _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (pschema.has_partial_update_new_key_policy()) { + _partial_update_new_row_policy = pschema.partial_update_new_key_policy(); + } + } _timestamp_ms = pschema.timestamp_ms(); if (pschema.has_nano_seconds()) { _nano_seconds = pschema.nano_seconds(); @@ -221,6 +226,27 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (tschema.__isset.partial_update_new_key_policy) { + switch (tschema.partial_update_new_key_policy) { + case doris::TPartialUpdateNewRowPolicy::APPEND: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND; + break; + } + case doris::TPartialUpdateNewRowPolicy::ERROR: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR; + break; + } + default: { + return Status::InvalidArgument( + "Unknown partial_update_new_key_behavior: {}, should be one of " + "'APPEND' or 'ERROR'", + tschema.partial_update_new_key_policy); + } + } + } + } + for (const auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); } @@ -305,6 +331,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_table_id(_table_id); pschema->set_version(_version); pschema->set_partial_update(_is_partial_update); + pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index ff1c2e8e6b072e..bcfa4e7be1ade5 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -92,6 +92,9 @@ class OlapTableSchemaParam { std::set partial_update_input_columns() const { return _partial_update_input_columns; } + PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const { + return _partial_update_new_row_policy; + } std::string auto_increment_coulumn() const { return _auto_increment_column; } int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } @@ -113,6 +116,8 @@ class OlapTableSchemaParam { std::vector _indexes; mutable ObjectPool _obj_pool; bool _is_partial_update = false; + PartialUpdateNewRowPolicyPB _partial_update_new_row_policy { + PartialUpdateNewRowPolicyPB::APPEND}; std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e3abd5a8a5d807..d82ac8eb2b8a52 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -639,6 +639,25 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_partial_update(false); } } + + if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) { + static const std::map policy_map { + {"APPEND", TPartialUpdateNewRowPolicy::APPEND}, + {"ERROR", TPartialUpdateNewRowPolicy::ERROR}}; + + auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY); + std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(), + [](unsigned char c) { return std::toupper(c); }); + auto it = policy_map.find(policy_name); + if (it == policy_map.end()) { + return Status::InvalidArgument( + "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', " + "'ERROR'}", + policy_name); + } + request.__set_partial_update_new_key_policy(it->second); + } + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); @@ -705,7 +724,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } ctx->put_result.params.__set_content_length(content_length); } - VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); // if we not use streaming, we must download total content before we begin // to process this load diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index b9b5f0d85ae302..fba5bba98d5ef9 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -59,6 +59,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines"; static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; +static const std::string HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY = "partial_update_new_key_behavior"; static const std::string HTTP_SQL = "sql"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 657ec265e50908..390ec6f1360e3e 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -238,6 +238,7 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info = std::make_shared(); RETURN_IF_ERROR(_partial_update_info->init( _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index 94dd4673f59850..eb0938033fad79 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -34,11 +34,14 @@ namespace doris { Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - bool partial_update, const std::set& partial_update_cols, + bool partial_update, + PartialUpdateNewRowPolicyPB policy, + const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version) { is_partial_update = partial_update; + partial_update_new_key_policy = policy; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; @@ -86,6 +89,7 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { partial_update_info_pb->set_is_partial_update(is_partial_update); + partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy); partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase); for (const auto& col : partial_update_input_columns) { partial_update_info_pb->add_partial_update_input_columns(col); @@ -113,6 +117,9 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { is_partial_update = partial_update_info_pb->is_partial_update(); + if (partial_update_info_pb->has_partial_update_new_key_policy()) { + partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy(); + } max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase() ? partial_update_info_pb->max_version_in_flush_phase() : -1; @@ -152,23 +159,35 @@ std::string PartialUpdateInfo::summary() const { update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase); } -Status PartialUpdateInfo::handle_non_strict_mode_not_found_error( - const TabletSchema& tablet_schema) { - if (!can_insert_new_rows_in_partial_update) { - std::string error_column; - for (auto cid : missing_cids) { - const TabletColumn& col = tablet_schema.column(cid); - if (!col.has_default_value() && !col.is_nullable() && - !(tablet_schema.auto_increment_column() == col.name())) { - error_column = col.name(); - break; +Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, + const std::function& line, + BitmapValue* skip_bitmap) { + switch (partial_update_new_key_policy) { + case doris::PartialUpdateNewRowPolicyPB::APPEND: { + if (_is_partial_update) { + if (!can_insert_new_rows_in_partial_update) { + std::string error_column; + for (auto cid : missing_cids) { + const TabletColumn& col = tablet_schema.column(cid); + if (!col.has_default_value() && !col.is_nullable() && + !(tablet_schema.auto_increment_column() == col.name())) { + error_column = col.name(); + break; + } + } + return Status::Error( + "the unmentioned column `{}` should have default value or be nullable " + "for newly inserted rows in non-strict mode partial update", + error_column); } - } - return Status::Error( - "the unmentioned column `{}` should have default value or be nullable " - "for " - "newly inserted rows in non-strict mode partial update", - error_column); + } + } break; + case doris::PartialUpdateNewRowPolicyPB::ERROR: { + return Status::Error( + "Can't append new rows in partial update when partial_update_new_key_behavior is " + "ERROR. Row with key=[{}] is not in table.", + line()); + } break; } return Status::OK(); } diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 05372154bd2d07..93915431606d39 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -38,13 +39,15 @@ struct RowsetId; struct PartialUpdateInfo { Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - bool partial_update, const std::set& partial_update_cols, - bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, + bool partial_update, PartialUpdateNewRowPolicyPB policy, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); - Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema); + Status handle_new_key(const TabletSchema& tablet_schema, + const std::function& line); std::string summary() const; private: @@ -52,6 +55,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND}; int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index be6c1dc45b8c81..9ee45594e4d3ca 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -497,16 +497,8 @@ Status SegmentWriter::probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { - RETURN_IF_ERROR( - _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema)); + if (!have_delete_sign) { + RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; has_default_or_nullable = true; @@ -678,8 +670,14 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool have_delete_sign = (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); - RETURN_IF_ERROR(probe_key_for_mow(key, segment_pos, have_input_seq_column, have_delete_sign, - read_plan, specified_rowsets, segment_caches, + auto not_found_cb = [&]() { + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return block->dump_one_line(block_pos, _num_sort_key_columns); + }); + }; + RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, + have_delete_sign, specified_rowsets, segment_caches, has_default_or_nullable, use_default_or_null_flag, stats)); } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 6339c7db4bfc63..1d2350c795c8fb 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -367,16 +367,8 @@ Status VerticalSegmentWriter::_probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { - RETURN_IF_ERROR( - _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema)); + if (!have_delete_sign) { + RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; has_default_or_nullable = true; @@ -539,10 +531,19 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da bool have_delete_sign = (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); - RETURN_IF_ERROR(_probe_key_for_mow(key, segment_pos, have_input_seq_column, - have_delete_sign, read_plan, specified_rowsets, - segment_caches, has_default_or_nullable, - use_default_or_null_flag, stats)); + auto not_found_cb = [&]() { + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return data.block->dump_one_line(block_pos, _num_sort_key_columns); + }); + }; + auto update_read_plan = [&](const RowLocation& loc) { + read_plan.prepare_to_read(loc, segment_pos); + }; + RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, + have_delete_sign, specified_rowsets, segment_caches, + has_default_or_nullable, use_default_or_null_flag, + update_read_plan, not_found_cb, stats)); } CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e9b518eaae02e3..aca22eb3f31bad 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -428,6 +428,7 @@ Status BaseRowsetBuilder::_build_current_tablet_schema( RETURN_IF_ERROR(_partial_update_info->init( tablet()->tablet_id(), _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 6abe6fdc860425..0b0b0fbe8a2057 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -38,6 +38,8 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -142,6 +144,7 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { public static final String KEY_SKIP_LINES = "skip_lines"; public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; public static final String KEY_COMMENT = "comment"; @@ -195,6 +198,12 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { return Boolean.valueOf(s); } }) + .put(PARTIAL_UPDATE_NEW_KEY_POLICY, new Function() { + @Override + public @Nullable TPartialUpdateNewRowPolicy apply(@Nullable String s) { + return TPartialUpdateNewRowPolicy.valueOf(s.toUpperCase()); + } + }) .put(TIMEZONE, new Function() { @Override public @Nullable String apply(@Nullable String s) { @@ -388,6 +397,16 @@ public static void checkProperties(Map properties) throws DdlExc } } + // partial update new key policy + final String partialUpdateNewKeyPolicyProperty = properties.get(PARTIAL_UPDATE_NEW_KEY_POLICY); + if (partialUpdateNewKeyPolicyProperty != null) { + if (!partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("append") + && !partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("error")) { + throw new DdlException(PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of [append, error], but found " + + partialUpdateNewKeyPolicyProperty); + } + } + // time zone final String timezone = properties.get(TIMEZONE); if (timezone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 920e03bdfa2472..2cbfc57c833f5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -151,7 +151,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode, int batchSize, String clusterId) { super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate, - txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance, - jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, enableMemTableOnSinkNode, batchSize); + partialUpdateNewKeyPolicy, txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, + loadZeroTolerance, jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, + enableMemTableOnSinkNode, batchSize); this.cloudClusterId = clusterId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 73abdb9f8d15fe..c3a3ddae1acaa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -224,7 +224,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, @@ -102,6 +105,7 @@ public LoadLoadingTask(Database db, OlapTable table, this.execMemLimit = execMemLimit; this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.txnId = txnId; this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough @@ -121,8 +125,9 @@ public void init(TUniqueId loadId, List> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, - strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, - this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode); + strictMode, isPartialUpdate, partialUpdateNewKeyPolicy, timezone, this.timeoutS, this.loadParallelism, + this.sendBatchParallelism, this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, + enableMemTableOnSinkNode); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index ef429a1d564208..7c2a0129552869 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -44,6 +44,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -68,7 +69,8 @@ public class LoadingTaskPlanner { private final List fileGroups; private final boolean strictMode; private final boolean isPartialUpdate; - private final long timeoutS; // timeout of load job, in second + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; + private final long timeoutS; // timeout of load job, in second private final int loadParallelism; private final int sendBatchParallelism; private final boolean useNewLoadScanNode; @@ -88,7 +90,8 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, + boolean strictMode, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo, boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) { this.loadJobId = loadJobId; @@ -100,6 +103,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; this.analyzer.setTimezone(timezone); + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 8cf32648d55f05..f2b6821c8043cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -49,13 +50,14 @@ public class UnboundTableSink extends UnboundLogicalSin private final boolean temporaryPartition; private final List partitions; private boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; private final boolean autoDetectPartition; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, - false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); + this(nameParts, colNames, hints, false, partitions, false, + TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } /** @@ -63,9 +65,9 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -73,6 +75,7 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, boolean isAutoDetectPartition, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -91,6 +94,7 @@ public UnboundTableSink(List nameParts, List colNames, List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, Optional.empty(), + children.get(0)); } @Override @@ -158,14 +167,16 @@ public int hashCode() { @Override public Plan withGroupExpression(Optional groupExpression) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, + children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 8ca58f977578a6..70061114caef08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.ImmutableList; @@ -66,12 +67,13 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, LogicalPlan plan) { String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, @@ -91,7 +93,8 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSinkMaybeOverwrite(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType, + boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, LogicalPlan plan) { if (isAutoDetectPartition) { // partitions is null if (!isOverwrite) { @@ -106,7 +109,7 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isAutoDetectPartition, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2fa8332631a771..4867324ab9074b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -452,6 +452,9 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink cte = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index e975b4914cfe89..575fdb2473c85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -77,6 +77,7 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -126,6 +127,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { Database database = pair.first; OlapTable table = pair.second; boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy(); LogicalPlan child = ((LogicalPlan) sink.child()); boolean childHasSeqCol = child.getOutput().stream() @@ -148,6 +150,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()), isPartialUpdate, + partialUpdateNewKeyPolicy, sink.getDMLCommandType(), child); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index d88ce20bb59a7a..7d9dac1057c94f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -42,6 +42,7 @@ public Rule build() { ctx.connectContext.getSessionVariable().isEnableMemtableOnSinkNode() ? false : ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDmlCommandType(), Optional.empty(), sink.getLogicalProperties(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java index 1abf8998e25557..851c28a67cad93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java @@ -70,6 +70,7 @@ import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -437,7 +438,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer logicalQuery = handleCte(logicalQuery); // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(), - isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); + isTempPart, partitions, isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.DELETE, logicalQuery); } protected LogicalPlan handleCte(LogicalPlan logicalPlan) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 69cbf762c2afe6..0e8fb4d8f2e080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -64,6 +64,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.tablefunction.HdfsTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -243,7 +244,8 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), - false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); + false, dataDesc.getPartitionNames(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.LOAD, tvfLogicalPlan); } private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, OlapTable olapTable, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 51cfbf0b27271f..f79303acf227a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -192,7 +193,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(), - false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); + false, ImmutableList.of(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.UPDATE, logicalQuery); } private void checkAssignmentColumn(ConnectContext ctx, List columnNameParts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 72e5c4c14e3b0e..e8d6fcbb963d5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -57,6 +57,7 @@ import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -315,6 +316,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis true, tempPartitionNames, sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); // 1. when overwrite table, allow auto partition or not is controlled by session variable. @@ -330,6 +332,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new HiveInsertCommandContext(); @@ -343,6 +346,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new IcebergInsertCommandContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 397c2927d8458d..11916290c4db85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -48,13 +49,15 @@ public class LogicalOlapTableSink extends LogicalTableS private final OlapTable targetTable; private final List partitionIds; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, - List outputExprs, boolean isPartialUpdate, DMLCommandType dmlCommandType, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, dmlCommandType, - Optional.empty(), Optional.empty(), child); + List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, partialUpdateNewKeyPolicy, + dmlCommandType, Optional.empty(), Optional.empty(), child); } /** @@ -62,12 +65,14 @@ public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.dmlCommandType = dmlCommandType; this.partitionIds = Utils.copyRequiredList(partitionIds); } @@ -77,14 +82,14 @@ public Plan withChildAndUpdateOutput(Plan child) { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), child); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), child); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); } public Database getDatabase() { @@ -103,13 +108,17 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + public DMLCommandType getDmlCommandType() { return dmlCommandType; } public LogicalOlapTableSink withOutputExprs(List outputExprs) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), child()); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), child()); } @Override @@ -125,6 +134,7 @@ public boolean equals(Object o) { } LogicalOlapTableSink that = (LogicalOlapTableSink) o; return isPartialUpdate == that.isPartialUpdate && dmlCommandType == that.dmlCommandType + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) && Objects.equals(partitionIds, that.partitionIds); @@ -133,7 +143,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -145,6 +155,7 @@ public String toString() { "cols", cols, "partitionIds", partitionIds, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -157,13 +168,14 @@ public R accept(PlanVisitor visitor, C context) { @Override public Plan withGroupExpression(Optional groupExpression) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, groupExpression, logicalProperties, children.get(0)); + partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 65dba188bf02cc..8f1746e79f2d33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -57,6 +58,7 @@ public class PhysicalOlapTableSink extends PhysicalTabl private final List partitionIds; private final boolean singleReplicaLoad; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; /** @@ -64,10 +66,11 @@ public class PhysicalOlapTableSink extends PhysicalTabl */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + LogicalProperties logicalProperties, CHILD_TYPE child) { this(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } @@ -76,9 +79,10 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, + CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); @@ -87,6 +91,7 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, groupExpression, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -137,6 +146,7 @@ public boolean equals(Object o) { PhysicalOlapTableSink that = (PhysicalOlapTableSink) o; return singleReplicaLoad == that.singleReplicaLoad && isPartialUpdate == that.isPartialUpdate + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) @@ -147,7 +157,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -160,6 +170,7 @@ public String toString() { "partitionIds", partitionIds, "singleReplicaLoad", singleReplicaLoad, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -177,20 +188,22 @@ public List getExpressions() { @Override public Plan withGroupExpression(Optional groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), child()); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), + child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, logicalProperties.get(), children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties.get(), + children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -223,7 +236,7 @@ public PhysicalProperties getRequirePhysicalProperties() { @Override public PhysicalOlapTableSink resetLogicalProperties() { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, null, physicalProperties, statistics, child()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 3f5af10d4b17f4..a902f3482b71fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -73,6 +73,7 @@ import org.apache.doris.thrift.TOlapTableSchemaParam; import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPaloNodesInfo; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; @@ -108,6 +109,7 @@ public class OlapTableSink extends DataSink { // partial update input columns private boolean isPartialUpdate = false; private HashSet partialUpdateInputColumns; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; // set after init called protected TDataSink tDataSink; @@ -190,6 +192,10 @@ public void setPartialUpdateInputColumns(boolean isPartialUpdate, HashSet pair : table.getIndexIdToMeta().entrySet()) { + MaterializedIndexMeta indexMeta = pair.getValue(); + List columns = Lists.newArrayList(); + List columnsDesc = Lists.newArrayList(); + List indexDesc = Lists.newArrayList(); + columns.addAll(indexMeta.getSchema().stream().map(Column::getNonShadowName).collect(Collectors.toList())); + for (Column column : indexMeta.getSchema()) { + TColumn tColumn = column.toThrift(); + // When schema change is doing, some modified column has prefix in name. Columns here + // is for the schema in rowset meta, which should be no column with shadow prefix. + // So we should remove the shadow prefix here. + if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { + tColumn.setColumnName(column.getNonShadowName()); + } + column.setIndexFlag(tColumn, table); + columnsDesc.add(tColumn); + } + List indexes = indexMeta.getIndexes(); + if (indexes.size() == 0 && pair.getKey() == table.getBaseIndexId()) { + // for compatible with old version befor 2.0-beta + // if indexMeta.getIndexes() is empty, use table.getIndexes() + indexes = table.getIndexes(); + } + for (Index index : indexes) { + TOlapTableIndex tIndex = index.toThrift(index.getColumnUniqueIds(table.getBaseSchema())); + indexDesc.add(tIndex); + } + TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns, + indexMeta.getSchemaHash()); + Expr whereClause = indexMeta.getWhereClause(); + if (whereClause != null) { + Expr expr = syncMvWhereClauses.getOrDefault(pair.getKey(), null); + if (expr == null) { + throw new AnalysisException(String.format("%s is not analyzed", whereClause.toSqlWithoutTbl())); + } + indexSchema.setWhereClause(expr.treeToThrift()); + } + indexSchema.setColumnsDesc(columnsDesc); + indexSchema.setIndexesDesc(indexDesc); + schemaParam.addToIndexes(indexSchema); + } + setPartialUpdateInfoForParam(schemaParam, table, uniqueKeyUpdateMode); + schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); + return schemaParam; + } + + private void setPartialUpdateInfoForParam(TOlapTableSchemaParam schemaParam, OlapTable table, + TUniqueKeyUpdateMode uniqueKeyUpdateMode) throws AnalysisException { + // for backward compatibility + schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode); + if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) { +>>>>>>> 6f00a7cb19 ([opt](partial update) use a separate config to control the behavior of newly inserted rows in partial update (#41232)) if (table.getState() == OlapTable.OlapTableState.ROLLUP || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) { throw new AnalysisException("Can't do partial update when table is doing schema change."); } +<<<<<<< HEAD +======= + schemaParam.setPartialUpdateNewKeyPolicy(partialUpdateNewKeyPolicy); + } + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) { + Column seqMapCol = table.getFullSchema().stream() + .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) + .findFirst().get(); + schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId()); + } + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { +>>>>>>> 6f00a7cb19 ([opt](partial update) use a separate config to control the behavior of newly inserted rows in partial update (#41232)) for (String s : partialUpdateInputColumns) { schemaParam.addToPartialUpdateInputColumns(s); } @@ -351,8 +444,6 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a } } } - schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); - return schemaParam; } private List getDistColumns(DistributionInfo distInfo) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 5f6c448b930733..b4e15262c380fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -54,6 +54,7 @@ import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TQueryGlobals; @@ -282,7 +283,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - isPartialUpdate); + isPartialUpdate, partialUpdateNewRowPolicy); scanNode = fileScanNode; scanNode.init(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 65b74f2353188b..a7e535712b8a92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -38,6 +38,7 @@ import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TGroupCommitMode; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -531,6 +532,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update"; + public static final String PARTIAL_UPDATE_NEW_KEY_BEHAVIOR = "partial_update_new_key_behavior"; + public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold"; public static final String INVERTED_INDEX_MAX_EXPANSIONS = "inverted_index_max_expansions"; @@ -1972,6 +1975,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true) public boolean enableUniqueKeyPartialUpdate = false; + @VariableMgr.VarAttr(name = PARTIAL_UPDATE_NEW_KEY_BEHAVIOR, needForward = true, description = { + "用于设置部分列更新中对于新插入的行的行为", + "Used to set the behavior for newly inserted rows in partial update." + }, checker = "checkPartialUpdateNewKeyBehavior", options = {"APPEND", "ERROR"}) + public String partialUpdateNewKeyPolicy = "APPEND"; + @VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = { "用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错", "Used to test whether the query cache is hit. " @@ -3999,6 +4008,10 @@ public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyPolicy); + } + public int getLoadStreamPerNode() { return loadStreamPerNode; } @@ -4550,6 +4563,29 @@ public void checkGenerateStatsFactor(String generateStatsFactor) { } } + public TPartialUpdateNewRowPolicy parsePartialUpdateNewKeyBehavior(String behavior) { + if (behavior == null) { + return null; + } else if (behavior.equalsIgnoreCase("APPEND")) { + return TPartialUpdateNewRowPolicy.APPEND; + } else if (behavior.equalsIgnoreCase("ERROR")) { + return TPartialUpdateNewRowPolicy.ERROR; + } + return null; + } + + public void checkPartialUpdateNewKeyBehavior(String partialUpdateNewKeyBehavior) { + TPartialUpdateNewRowPolicy policy = parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyBehavior); + if (policy == null) { + UnsupportedOperationException exception = + new UnsupportedOperationException(PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + + " should be one of {'APPEND', 'ERROR'}, but found " + + partialUpdateNewKeyBehavior); + LOG.warn("Check " + PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + " failed", exception); + throw exception; + } + } + public void setGenerateStatsFactor(int factor) { this.generateStatsFactor = factor; if (factor <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index d48d1702e1b2db..76e1738699b361 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; @@ -110,6 +111,10 @@ default long getFileSize() { boolean isPartialUpdate(); + default TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return TPartialUpdateNewRowPolicy.APPEND; + } + default boolean getTrimDoubleQuotes() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 94f3625fbc7c2b..7254fca6f6ed95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -33,6 +33,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; @@ -84,6 +85,7 @@ public class StreamLoadTask implements LoadTaskInfo { private List hiddenColumns; private boolean trimDoubleQuotes = false; private boolean isPartialUpdate = false; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private int skipLines = 0; private boolean enableProfile = false; @@ -301,6 +303,11 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + @Override + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + @Override public boolean isMemtableOnSinkNode() { return memtableOnSinkNode; @@ -454,6 +461,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetPartialUpdate()) { isPartialUpdate = request.isPartialUpdate(); } + if (request.isSetPartialUpdateNewKeyPolicy()) { + partialUpdateNewKeyPolicy = request.getPartialUpdateNewKeyPolicy(); + } if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java index cef9bac4ed90a7..f3d718e9641424 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Test; @@ -99,7 +100,7 @@ void testEliminateSortUnderTableSink() { .limit(1, 1).build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .disableNereidsRules("PRUNE_EMPTY_PARTITION") .rewrite() @@ -113,7 +114,7 @@ void testEliminateSortUnderTableSink() { .build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .rewrite() .nonMatch(logicalSort()); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 99cd99410ed7de..b805dfa52640ed 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -74,5 +74,8 @@ message POlapTableSchemaParam { optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; optional int32 nano_seconds = 14 [default = 0]; + optional UniqueKeyUpdateModePB unique_key_update_mode = 15 [default = UPSERT]; + optional int32 sequence_map_col_unique_id = 16 [default = -1]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 17 [default = APPEND]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index d62c9df5073147..501bf3921eb944 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -628,6 +628,11 @@ message RowsetBinlogMetasPB { repeated RowsetBinlogMetaPB rowset_binlog_metas = 1; } +enum PartialUpdateNewRowPolicyPB { + APPEND = 0; + ERROR = 1; +} + message PartialUpdateInfoPB { optional bool is_partial_update = 1 [default = false]; repeated string partial_update_input_columns = 2; @@ -642,4 +647,6 @@ message PartialUpdateInfoPB { repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; optional int32 nano_seconds = 13 [default = 0]; + optional UniqueKeyUpdateModePB partial_update_mode = 14 [default = UPSERT]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 15 [default = APPEND]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 14bb6412a10747..3fcba032c2b8ea 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -161,6 +161,11 @@ enum TIndexType { NGRAM_BF = 3 } +enum TPartialUpdateNewRowPolicy { + APPEND = 0, + ERROR = 1 +} + // Mapping from names defined by Avro to the enum. // We permit gzip and bzip2 in addition. const map COMPRESSION_MAP = { @@ -259,6 +264,9 @@ struct TOlapTableSchemaParam { 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 + 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + 15: optional i32 sequence_map_col_unique_id = -1 + 16: optional TPartialUpdateNewRowPolicy partial_update_new_key_policy } struct TTabletLocation { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 19d91f0fded93c..0256755c1971ad 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -764,6 +764,8 @@ struct TStreamLoadPutRequest { 54: optional bool group_commit // deprecated 55: optional i32 stream_per_node; 56: optional string group_commit_mode + 57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + 58: optional Descriptors.TPartialUpdateNewRowPolicy partial_update_new_key_policy // For cloud 1000: optional string cloud_cluster diff --git a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out index 4585bd5c5834dd..6ad32390d03d36 100644 --- a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out +++ b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out @@ -1,9 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !select -- -1 kevin 18 shenzhen 500 2023-07-03T12:00:01 -3 \N \N \N 23 2023-07-03T12:00:02 -18 \N \N \N 9999999 2023-07-03T12:00:03 - -- !select -- 3 Jack \N 90 4 Mike 3 \N diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv new file mode 100644 index 00000000000000..49517f38a3b070 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv @@ -0,0 +1,4 @@ +0,40 +1,40 +10,40 +11,40 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv new file mode 100644 index 00000000000000..3f614863e4f57c --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv @@ -0,0 +1,4 @@ +2,50 +3,50 +13,50 +14,50 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv new file mode 100644 index 00000000000000..7fa3c1ae4e2bcd --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv @@ -0,0 +1,4 @@ +4,60,60,60 +5,60,60,60 +21,60,60,60 +22,60,60,60 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out new file mode 100644 index 00000000000000..499f7f4d1ffcc9 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out @@ -0,0 +1,153 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !insert_append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert3 -- +0 10 0 0 +1 9 9 9 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +100 9 9 9 +200 9 9 9 + +-- !stream_load_append -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_error -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_ignore_property -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 +100 9 9 9 +200 9 9 9 + +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !broker_load_append -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_error -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_ignore_property -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 + +-- !sql -- +0 \N 30 456 +1 20 123 456 +2 \N 30 456 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 \N 30 456 +0 0 0 0 +0 20 123 456 +1 1 1 1 +1 20 123 456 +2 \N 30 456 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +10 20 123 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 20 0 0 +1 20 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 20 30 \N +11 20 \N \N +12 \N 30 \N + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out new file mode 100644 index 00000000000000..9c95e5ebde7c1e --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out @@ -0,0 +1,78 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !ignore -- +0 10 0 0 +1 1 20 1 +2 2 2 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !append -- +0 10 0 40 +1 1 30 40 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !ignore -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !error1 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 40 +11 \N \N 40 + +-- !error2 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 70 +11 \N \N 40 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out index 93fa33d81f9e96..18fa75df60991e 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out @@ -4,12 +4,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -23,7 +27,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -38,12 +44,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -57,7 +67,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 diff --git a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy index d8057f374afd01..ccc46f59af946f 100644 --- a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy +++ b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy @@ -19,65 +19,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_error_data_handling", "p0") { def tableName = "test_error_data_handling" - // partial update strict mode - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE ${tableName} ( - id INT, - name VARCHAR(10), - age INT, - city VARCHAR(10), - balance DECIMAL(9, 0), - last_access_time DATETIME - ) - UNIQUE KEY(id) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES("replication_num" = "1") - """ - - sql """ - INSERT INTO ${tableName} VALUES (1, 'kevin', 18, 'shenzhen', 400, '2023-07-01 12:00:00') - """ - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'true' - time 10000 - check {result, exception, startTime, endTime -> - assertTrue(exception == null) - def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) - assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) - } - } - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'false' - time 10000 - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - } - } - - qt_select "SELECT * FROM ${tableName} ORDER BY id" - sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ diff --git a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy index 6a54d6a5e71702..f2ff40649d24de 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy @@ -179,13 +179,12 @@ suite("nereids_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync;" - // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy index d92dc41a68bc34..ac98141b121245 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy @@ -264,14 +264,16 @@ suite("test_partial_update_insert_light_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" - sql "set enable_unique_key_partial_update=false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync" qt_add_key_col_2 " select * from ${tableName} order by c0; " diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy index 9e0abc9704c482..852fad19737336 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy @@ -249,12 +249,16 @@ suite("test_partial_update_insert_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" sql "set enable_unique_key_partial_update=false;" sql "sync" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy index c3e25a3a690c44..87eb39a3b5b567 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy @@ -175,17 +175,17 @@ suite("test_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "set enable_unique_key_partial_update=true;" sql "sync;" // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" - sql "set enable_insert_strict = false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync;" sql """ DROP TABLE IF EXISTS ${tableName5}; """ diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy new file mode 100644 index 00000000000000..c425650c2917dd --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy @@ -0,0 +1,267 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_new_key_policy", "p0") { + + def tableName = "test_partial_update_new_key_policy" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="3");""" + qt_sql """select * from ${tableName} order by k;""" + + def checkVariable = { expected -> + def res = sql_return_maparray """show variables where Variable_name="partial_update_new_key_behavior";"""; + logger.info("res: ${res}") + assertTrue(res[0].Value.equalsIgnoreCase(expected)); + } + + // 1. test insert stmt + // 1.1 + sql "set enable_unique_key_partial_update=true;" + sql "sync" + + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" + checkVariable("APPEND") + explain { + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: APPEND" + } + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + qt_insert_append """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" + checkVariable("ERROR") + explain { + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: ERROR" + } + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + qt_insert_error1 """select * from ${tableName} order by k;""" + test { + sql "insert into ${tableName}(k,c2) values(1,30),(10,999),(11,999);" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[10] is not in table." + } + qt_insert_error2 """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior=default;""" + sql "sync;" + checkVariable("APPEND") + test { + sql """set partial_update_new_key_behavior="invalid";""" + exception "partial_update_new_key_behavior should be one of {'APPEND', 'ERROR'}, but found invalid" + } + checkVariable("APPEND") + + // 1.2 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + sql "set partial_update_new_key_behavior=ERROR;" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + + sql "insert into ${tableName} values(1,9,9,9),(2,9,9,9),(100,9,9,9),(200,9,9,9);" + qt_insert3 """select * from ${tableName} order by k;""" + + + // 2. test stream load + // 2.1 APPEND + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'append' + file 'row_policy1.csv' + time 10000 + } + qt_stream_load_append """select * from ${tableName} order by k;""" + // 2.2 ERROR + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy2.csv' + time 10000 + check {result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.")) + } + } + qt_stream_load_error """select * from ${tableName} order by k;""" + // 2.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c1,c2,c3' + set 'partial_columns', 'false' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy3.csv' + time 10000 + } + qt_stream_load_ignore_property """select * from ${tableName} order by k;""" + + + // 3. test broker load + tableName = "test_partial_update_new_key_policy2" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql "insert into ${tableName} select number,number,number,number from numbers(\"number\"=\"6\");" + qt_sql """select * from ${tableName} order by k;""" + + // 3.1 APPEND + def label = "test_pu_new_key_policy1" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy1.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "APPEND" + ); + """ + waitForBrokerLoadDone(label) + def res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_append """select * from ${tableName} order by k;""" + + // 3.2 ERROR + label = "test_pu_new_key_policy2" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy2.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label, 600) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "CANCELLED" && res[0].ErrorMsg.contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.") + qt_broker_load_error """select * from ${tableName} order by k;""" + + // 3.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + label = "test_pu_new_key_policy3" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy3.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c1,c2,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_ignore_property """select * from ${tableName} order by k;""" + + + // 4. test this config will not affect non MOW tables + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" + + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k,c1,c2,c3;""" + + tableName = "test_partial_update_new_key_policy4" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int max, + `c2` int min, + `c3` int sum) + AGGREGATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy index 722f1fccb7643c..fac17ac542bc16 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy @@ -42,19 +42,18 @@ suite("test_partial_update_only_keys", "p0") { qt_sql """select * from ${tableName} order by k;""" // new rows will be appended sql "set enable_unique_key_partial_update=true;" - sql "set enable_insert_strict=false;" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" - // fail if has new rows - sql "set enable_insert_strict=true;" + // fail if has new rows when partial_update_new_key_behavior=ERROR + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" test { sql "insert into ${tableName}(k) values(0),(1),(10),(11);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_sql """select * from ${tableName} order by k;""" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 864a97e13bfdc5..ddd2436f938234 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -421,7 +421,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -434,9 +434,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1002,7 +1000,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1015,9 +1013,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy index 2ce042f8b6391c..22d396124924e7 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -419,13 +419,13 @@ suite("test_partial_update_row_store_schema_change", "p0") { }); // test load data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR streamLoad { table "${tableName}" set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -438,9 +438,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1015,7 +1013,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1028,9 +1026,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy index c4d26baee8f481..dc591d7b02a6c4 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy @@ -18,6 +18,9 @@ suite("test_partial_update_strict_mode", "p0") { + // NOTE: after https://github.com/apache/doris/pull/41232, the handling of newly inserted rows in partial update + // is not controlled by strict mode + String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database @@ -67,8 +70,8 @@ suite("test_partial_update_strict_mode", "p0") { def json = parseJson(result) assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -114,11 +117,10 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -211,7 +213,7 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) + assertEquals("Success", json.Status) } } @@ -253,7 +255,7 @@ suite("test_partial_update_strict_mode", "p0") { (1,600,"2023-08-03 12:00:01"), (2,500,"2023-07-03 12:00:01"), (4,23,"2023-07-03 12:00:02");""" - exception "Insert has filtered data in strict mode" + exception "[E-207]the unmentioned column `city` should have default value or be nullable for newly inserted rows in non-strict mode partial update" } sql "sync;" qt_sql """select * from ${tableName5} order by id;""" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy index ad2e4b2f4ff229..ba12be2b99ae2d 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy @@ -78,7 +78,7 @@ suite("test_partial_update_upsert", "p0") { `last_access_time` datetime NULL ) ENGINE = OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) - BUCKETS AUTO PROPERTIES ( + BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "storage_format" = "V2", "enable_unique_key_merge_on_write" = "true", @@ -95,7 +95,7 @@ suite("test_partial_update_upsert", "p0") { set 'format', 'csv' set 'partial_columns', 'true' set 'columns', 'id,balance,last_access_time' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' file 'upsert.csv' time 10000 // limit inflight 10s @@ -104,6 +104,7 @@ suite("test_partial_update_upsert", "p0") { assertTrue(exception == null) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } sql "sync" From eb6e9daf06e5f68def026ac941f0fbe0a07d1080 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 25 Jun 2025 18:36:06 +0800 Subject: [PATCH 2/2] fix --- be/src/exec/tablet_info.cpp | 4 +- be/src/olap/partial_update_info.cpp | 10 +-- .../olap/rowset/segment_v2/segment_writer.cpp | 8 +- .../olap/rowset/segment_v2/segment_writer.h | 1 + .../segment_v2/vertical_segment_writer.cpp | 11 +-- .../segment_v2/vertical_segment_writer.h | 1 + .../org/apache/doris/analysis/LoadStmt.java | 1 - .../apache/doris/planner/OlapTableSink.java | 87 +------------------ .../doris/planner/StreamLoadPlanner.java | 6 +- gensrc/proto/descriptors.proto | 4 +- gensrc/proto/olap_file.proto | 2 +- gensrc/thrift/Descriptors.thrift | 2 +- gensrc/thrift/FrontendService.thrift | 2 +- 13 files changed, 28 insertions(+), 111 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 97a08028bb3c83..4e3c4991a0aacf 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -130,7 +130,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { } _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } - if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (_is_partial_update) { if (pschema.has_partial_update_new_key_policy()) { _partial_update_new_row_policy = pschema.partial_update_new_key_policy(); } @@ -226,7 +226,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } - if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (_is_partial_update) { if (tschema.__isset.partial_update_new_key_policy) { switch (tschema.partial_update_new_key_policy) { case doris::TPartialUpdateNewRowPolicy::APPEND: { diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index eb0938033fad79..9a852a28f20ffb 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -34,8 +34,7 @@ namespace doris { Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - bool partial_update, - PartialUpdateNewRowPolicyPB policy, + bool partial_update, PartialUpdateNewRowPolicyPB policy, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, @@ -160,11 +159,10 @@ std::string PartialUpdateInfo::summary() const { } Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, - const std::function& line, - BitmapValue* skip_bitmap) { + const std::function& line) { switch (partial_update_new_key_policy) { case doris::PartialUpdateNewRowPolicyPB::APPEND: { - if (_is_partial_update) { + if (is_partial_update) { if (!can_insert_new_rows_in_partial_update) { std::string error_column; for (auto cid : missing_cids) { @@ -180,7 +178,7 @@ Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, "for newly inserted rows in non-strict mode partial update", error_column); } - } + } } break; case doris::PartialUpdateNewRowPolicyPB::ERROR: { return Status::Error( diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 9ee45594e4d3ca..97145449a6dc7e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -489,7 +489,7 @@ Status SegmentWriter::probe_key_for_mow( PartialUpdateReadPlan& read_plan, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, - PartialUpdateStats& stats) { + const std::function& not_found_cb, PartialUpdateStats& stats) { RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; @@ -677,9 +677,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* }); }; RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, - have_delete_sign, specified_rowsets, segment_caches, - has_default_or_nullable, use_default_or_null_flag, - stats)); + have_delete_sign, read_plan, specified_rowsets, + segment_caches, has_default_or_nullable, + use_default_or_null_flag, not_found_cb, stats)); } CHECK_EQ(use_default_or_null_flag.size(), num_rows); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 02ae962a16a942..d248d20a1c0980 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -101,6 +101,7 @@ class SegmentWriter { std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, + const std::function& not_found_cb, PartialUpdateStats& stats); Status partial_update_preconditions_check(size_t row_pos); Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos, diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 1d2350c795c8fb..22d6ee4f868927 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -359,7 +359,7 @@ Status VerticalSegmentWriter::_probe_key_for_mow( PartialUpdateReadPlan& read_plan, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, - PartialUpdateStats& stats) { + const std::function& not_found_cb, PartialUpdateStats& stats) { RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; @@ -537,13 +537,10 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da return data.block->dump_one_line(block_pos, _num_sort_key_columns); }); }; - auto update_read_plan = [&](const RowLocation& loc) { - read_plan.prepare_to_read(loc, segment_pos); - }; RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, - have_delete_sign, specified_rowsets, segment_caches, - has_default_or_nullable, use_default_or_null_flag, - update_read_plan, not_found_cb, stats)); + have_delete_sign, read_plan, specified_rowsets, + segment_caches, has_default_or_nullable, + use_default_or_null_flag, not_found_cb, stats)); } CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index a327873bbce356..00964823cb62e1 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -172,6 +172,7 @@ class VerticalSegmentWriter { std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, + const std::function& not_found_cb, PartialUpdateStats& stats); Status _partial_update_preconditions_check(size_t row_pos); Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 0b0b0fbe8a2057..803d9f8a1ff41b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -38,7 +38,6 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Function; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index a902f3482b71fa..a79f68e1c3af29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -344,96 +344,13 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a indexSchema.setIndexesDesc(indexDesc); schemaParam.addToIndexes(indexSchema); } -<<<<<<< HEAD schemaParam.setIsPartialUpdate(isPartialUpdate); if (isPartialUpdate) { -======= - setPartialUpdateInfoForParam(schemaParam, table, uniqueKeyUpdateMode); - schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); - return schemaParam; - } - - private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) throws AnalysisException { - TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam(); - schemaParam.setDbId(dbId); - schemaParam.setTableId(table.getId()); - schemaParam.setVersion(table.getIndexMetaByIndexId(table.getBaseIndexId()).getSchemaVersion()); - schemaParam.setIsStrictMode(isStrictMode); - - schemaParam.tuple_desc = tupleDescriptor.toThrift(); - for (SlotDescriptor slotDesc : tupleDescriptor.getSlots()) { - schemaParam.addToSlotDescs(slotDesc.toThrift()); - } - - for (Map.Entry pair : table.getIndexIdToMeta().entrySet()) { - MaterializedIndexMeta indexMeta = pair.getValue(); - List columns = Lists.newArrayList(); - List columnsDesc = Lists.newArrayList(); - List indexDesc = Lists.newArrayList(); - columns.addAll(indexMeta.getSchema().stream().map(Column::getNonShadowName).collect(Collectors.toList())); - for (Column column : indexMeta.getSchema()) { - TColumn tColumn = column.toThrift(); - // When schema change is doing, some modified column has prefix in name. Columns here - // is for the schema in rowset meta, which should be no column with shadow prefix. - // So we should remove the shadow prefix here. - if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { - tColumn.setColumnName(column.getNonShadowName()); - } - column.setIndexFlag(tColumn, table); - columnsDesc.add(tColumn); - } - List indexes = indexMeta.getIndexes(); - if (indexes.size() == 0 && pair.getKey() == table.getBaseIndexId()) { - // for compatible with old version befor 2.0-beta - // if indexMeta.getIndexes() is empty, use table.getIndexes() - indexes = table.getIndexes(); - } - for (Index index : indexes) { - TOlapTableIndex tIndex = index.toThrift(index.getColumnUniqueIds(table.getBaseSchema())); - indexDesc.add(tIndex); - } - TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns, - indexMeta.getSchemaHash()); - Expr whereClause = indexMeta.getWhereClause(); - if (whereClause != null) { - Expr expr = syncMvWhereClauses.getOrDefault(pair.getKey(), null); - if (expr == null) { - throw new AnalysisException(String.format("%s is not analyzed", whereClause.toSqlWithoutTbl())); - } - indexSchema.setWhereClause(expr.treeToThrift()); - } - indexSchema.setColumnsDesc(columnsDesc); - indexSchema.setIndexesDesc(indexDesc); - schemaParam.addToIndexes(indexSchema); - } - setPartialUpdateInfoForParam(schemaParam, table, uniqueKeyUpdateMode); - schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); - return schemaParam; - } - - private void setPartialUpdateInfoForParam(TOlapTableSchemaParam schemaParam, OlapTable table, - TUniqueKeyUpdateMode uniqueKeyUpdateMode) throws AnalysisException { - // for backward compatibility - schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); - schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode); - if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) { ->>>>>>> 6f00a7cb19 ([opt](partial update) use a separate config to control the behavior of newly inserted rows in partial update (#41232)) + schemaParam.setPartialUpdateNewKeyPolicy(partialUpdateNewKeyPolicy); if (table.getState() == OlapTable.OlapTableState.ROLLUP || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) { throw new AnalysisException("Can't do partial update when table is doing schema change."); } -<<<<<<< HEAD -======= - schemaParam.setPartialUpdateNewKeyPolicy(partialUpdateNewKeyPolicy); - } - if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) { - Column seqMapCol = table.getFullSchema().stream() - .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) - .findFirst().get(); - schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId()); - } - if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { ->>>>>>> 6f00a7cb19 ([opt](partial update) use a separate config to control the behavior of newly inserted rows in partial update (#41232)) for (String s : partialUpdateInputColumns) { schemaParam.addToPartialUpdateInputColumns(s); } @@ -444,6 +361,8 @@ private void setPartialUpdateInfoForParam(TOlapTableSchemaParam schemaParam, Ola } } } + schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); + return schemaParam; } private List getDistColumns(DistributionInfo distInfo) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index b4e15262c380fe..874cd4e5d38646 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -54,7 +54,6 @@ import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TQueryGlobals; @@ -283,7 +282,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - isPartialUpdate, partialUpdateNewRowPolicy); + isPartialUpdate); scanNode = fileScanNode; scanNode.init(analyzer); @@ -310,6 +309,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), txnTimeout); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); + if (isPartialUpdate) { + olapTableSink.setPartialUpdateNewRowPolicy(taskInfo.getPartialUpdateNewRowPolicy()); + } olapTableSink.complete(analyzer); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index b805dfa52640ed..d29600ebf2b687 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -74,8 +74,8 @@ message POlapTableSchemaParam { optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; optional int32 nano_seconds = 14 [default = 0]; - optional UniqueKeyUpdateModePB unique_key_update_mode = 15 [default = UPSERT]; - optional int32 sequence_map_col_unique_id = 16 [default = -1]; + reserved 15; // optional UniqueKeyUpdateModePB unique_key_update_mode = 15 [default = UPSERT]; + reserved 16; // optional int32 sequence_map_col_unique_id = 16 [default = -1]; optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 17 [default = APPEND]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 501bf3921eb944..7d1742918b6fde 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -647,6 +647,6 @@ message PartialUpdateInfoPB { repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; optional int32 nano_seconds = 13 [default = 0]; - optional UniqueKeyUpdateModePB partial_update_mode = 14 [default = UPSERT]; + reserved 14; // optional UniqueKeyUpdateModePB partial_update_mode = 14 [default = UPSERT]; optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 15 [default = APPEND]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 3fcba032c2b8ea..3256de4756fd8c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -264,7 +264,7 @@ struct TOlapTableSchemaParam { 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 - 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + // 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode 15: optional i32 sequence_map_col_unique_id = -1 16: optional TPartialUpdateNewRowPolicy partial_update_new_key_policy } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0256755c1971ad..72a1041a1bff47 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -764,7 +764,7 @@ struct TStreamLoadPutRequest { 54: optional bool group_commit // deprecated 55: optional i32 stream_per_node; 56: optional string group_commit_mode - 57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + // 57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode 58: optional Descriptors.TPartialUpdateNewRowPolicy partial_update_new_key_policy // For cloud