diff --git a/be/src/common/status.h b/be/src/common/status.h index 9c56c0b024a5fa..c8c2f3064e917a 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -294,6 +294,7 @@ namespace ErrorCode { E(KEY_NOT_FOUND, -7000, false); \ E(KEY_ALREADY_EXISTS, -7001, false); \ E(ENTRY_NOT_FOUND, -7002, false); \ + E(NEW_ROWS_IN_PARTIAL_UPDATE, -7003, false); \ E(INVALID_TABLET_STATE, -7211, false); \ E(ROWSETS_EXPIRED, -7311, false); \ E(CGROUP_ERROR, -7411, false); \ diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 38fa9242f8bbc7..89b0143d1ab6d9 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -145,6 +145,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { } _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (pschema.has_partial_update_new_key_policy()) { + _partial_update_new_row_policy = pschema.partial_update_new_key_policy(); + } + } _timestamp_ms = pschema.timestamp_ms(); if (pschema.has_nano_seconds()) { _nano_seconds = pschema.nano_seconds(); @@ -272,6 +277,27 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) { + if (tschema.__isset.partial_update_new_key_policy) { + switch (tschema.partial_update_new_key_policy) { + case doris::TPartialUpdateNewRowPolicy::APPEND: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND; + break; + } + case doris::TPartialUpdateNewRowPolicy::ERROR: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR; + break; + } + default: { + return Status::InvalidArgument( + "Unknown partial_update_new_key_behavior: {}, should be one of " + "'APPEND' or 'ERROR'", + tschema.partial_update_new_key_policy); + } + } + } + } + for (const auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); } @@ -360,6 +386,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { // for backward compatibility pschema->set_partial_update(true); } + pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 5c702ac0e2fcdb..be7a4af6657af6 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -104,6 +104,9 @@ class OlapTableSchemaParam { std::set partial_update_input_columns() const { return _partial_update_input_columns; } + PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const { + return _partial_update_new_row_policy; + } std::string auto_increment_coulumn() const { return _auto_increment_column; } int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } @@ -128,6 +131,8 @@ class OlapTableSchemaParam { std::vector _indexes; mutable ObjectPool _obj_pool; UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT}; + PartialUpdateNewRowPolicyPB _partial_update_new_row_policy { + PartialUpdateNewRowPolicyPB::APPEND}; std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index b5b7b2d3ae6702..72ade89da9a055 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -688,6 +688,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, unique_key_update_mode_str); } } + if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() && !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { // only consider `partial_columns` parameter when `unique_key_update_mode` is not set @@ -698,6 +699,24 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } } + if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) { + static const std::map policy_map { + {"APPEND", TPartialUpdateNewRowPolicy::APPEND}, + {"ERROR", TPartialUpdateNewRowPolicy::ERROR}}; + + auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY); + std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(), + [](unsigned char c) { return std::toupper(c); }); + auto it = policy_map.find(policy_name); + if (it == policy_map.end()) { + return Status::InvalidArgument( + "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', " + "'ERROR'}", + policy_name); + } + request.__set_partial_update_new_key_policy(it->second); + } + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index ce42aa1094568a..4c856ba4478775 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -60,6 +60,7 @@ static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; static const std::string HTTP_UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode"; +static const std::string HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY = "partial_update_new_key_behavior"; static const std::string HTTP_SQL = "sql"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index c51db3777feb34..c6a3ff0d697025 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -239,6 +239,7 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, RETURN_IF_ERROR(_partial_update_info->init( _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->unique_key_update_mode(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index a031efdd83ba61..093b70e4ebf25e 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -35,12 +35,14 @@ namespace doris { Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode, + PartialUpdateNewRowPolicyPB policy, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, const std::string& auto_increment_column, int32_t sequence_map_col_uid, int64_t cur_max_version) { partial_update_mode = unique_key_update_mode; + partial_update_new_key_policy = policy; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; sequence_map_col_unqiue_id = sequence_map_col_uid; @@ -97,6 +99,7 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { partial_update_info_pb->set_partial_update_mode(partial_update_mode); + partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy); partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase); for (const auto& col : partial_update_input_columns) { partial_update_info_pb->add_partial_update_input_columns(col); @@ -133,6 +136,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { } else { partial_update_mode = partial_update_info_pb->partial_update_mode(); } + if (partial_update_info_pb->has_partial_update_new_key_policy()) { + partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy(); + } max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase() ? partial_update_info_pb->max_version_in_flush_phase() : -1; @@ -186,56 +192,55 @@ std::string PartialUpdateInfo::summary() const { max_version_in_flush_phase); } -Status PartialUpdateInfo::handle_not_found_error_for_fixed_partial_update( - const TabletSchema& tablet_schema) const { - if (!can_insert_new_rows_in_partial_update) { - std::string error_column; - for (auto cid : missing_cids) { - const TabletColumn& col = tablet_schema.column(cid); - if (!col.has_default_value() && !col.is_nullable() && - !(tablet_schema.auto_increment_column() == col.name())) { - error_column = col.name(); - break; +Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, + const std::function& line, + BitmapValue* skip_bitmap) { + switch (partial_update_new_key_policy) { + case doris::PartialUpdateNewRowPolicyPB::APPEND: { + if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { + if (!can_insert_new_rows_in_partial_update) { + std::string error_column; + for (auto cid : missing_cids) { + const TabletColumn& col = tablet_schema.column(cid); + if (!col.has_default_value() && !col.is_nullable() && + !(tablet_schema.auto_increment_column() == col.name())) { + error_column = col.name(); + break; + } + } + return Status::Error( + "the unmentioned column `{}` should have default value or be nullable " + "for newly inserted rows in non-strict mode partial update", + error_column); + } + } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) { + DCHECK(skip_bitmap != nullptr); + bool can_insert_new_row {true}; + std::string error_column; + for (auto cid : missing_cids) { + const TabletColumn& col = tablet_schema.column(cid); + if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() && + !col.is_nullable() && col.is_auto_increment()) { + error_column = col.name(); + can_insert_new_row = false; + break; + } + } + if (!can_insert_new_row) { + return Status::Error( + "the unmentioned column `{}` should have default value or be " + "nullable for newly inserted rows in non-strict mode flexible partial " + "update", + error_column); } } - return Status::Error( - "the unmentioned column `{}` should have default value or be nullable " - "for newly inserted rows in non-strict mode partial update", - error_column); - } - return Status::OK(); -} - -Status PartialUpdateInfo::handle_not_found_error_for_flexible_partial_update( - const TabletSchema& tablet_schema, BitmapValue* skip_bitmap) const { - DCHECK(skip_bitmap != nullptr); - bool can_insert_new_rows_in_partial_update = true; - std::string error_column; - for (auto cid : missing_cids) { - const TabletColumn& col = tablet_schema.column(cid); - if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() && - !col.is_nullable() && col.is_auto_increment()) { - error_column = col.name(); - can_insert_new_rows_in_partial_update = false; - break; - } - } - if (!can_insert_new_rows_in_partial_update) { - return Status::Error( - "the unmentioned column `{}` should have default value or be " - "nullable for newly inserted rows in non-strict mode flexible partial update", - error_column); - } - return Status::OK(); -} - -Status PartialUpdateInfo::handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema, - BitmapValue* skip_bitmap) const { - if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { - RETURN_IF_ERROR(handle_not_found_error_for_fixed_partial_update(tablet_schema)); - } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) { - RETURN_IF_ERROR( - handle_not_found_error_for_flexible_partial_update(tablet_schema, skip_bitmap)); + } break; + case doris::PartialUpdateNewRowPolicyPB::ERROR: { + return Status::Error( + "Can't append new rows in partial update when partial_update_new_key_behavior is " + "ERROR. Row with key=[{}] is not in table.", + line()); + } break; } return Status::OK(); } diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index f50937d014c57f..806db586f5137e 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -43,19 +44,16 @@ class BitmapValue; struct PartialUpdateInfo { Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - UniqueKeyUpdateModePB unique_key_update_mode, + UniqueKeyUpdateModePB unique_key_update_mode, PartialUpdateNewRowPolicyPB policy, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); - Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema, - BitmapValue* skip_bitmap = nullptr) const; - - Status handle_not_found_error_for_fixed_partial_update(const TabletSchema& tablet_schema) const; - Status handle_not_found_error_for_flexible_partial_update(const TabletSchema& tablet_schema, - BitmapValue* skip_bitmap) const; + Status handle_new_key(const TabletSchema& tablet_schema, + const std::function& line, + BitmapValue* skip_bitmap = nullptr); std::string summary() const; std::string partial_update_mode_str() const { @@ -84,6 +82,7 @@ struct PartialUpdateInfo { public: UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; + PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND}; int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index f6e6513493c780..3f3ceed7c3540c 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -499,13 +499,7 @@ Status SegmentWriter::probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { + if (!have_delete_sign) { RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; @@ -678,8 +672,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); auto not_found_cb = [&]() { - return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema); + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return block->dump_one_line(block_pos, _num_sort_key_columns); + }); }; auto update_read_plan = [&](const RowLocation& loc) { read_plan.prepare_to_read(loc, segment_pos); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index c35c5dac492223..2c7eb42b2087f4 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -375,13 +375,7 @@ Status VerticalSegmentWriter::_probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { + if (!have_delete_sign) { RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; @@ -557,8 +551,10 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); auto not_found_cb = [&]() { - return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema); + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return data.block->dump_one_line(block_pos, _num_sort_key_columns); + }); }; auto update_read_plan = [&](const RowLocation& loc) { read_plan.prepare_to_read(loc, segment_pos); @@ -879,8 +875,12 @@ Status VerticalSegmentWriter::_generate_flexible_read_plan( delete_sign_column_data[block_pos] != 0); auto not_found_cb = [&]() { - return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema, &skip_bitmap); + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, + [&]() -> std::string { + return data.block->dump_one_line(block_pos, _num_sort_key_columns); + }, + &skip_bitmap); }; auto update_read_plan = [&](const RowLocation& loc) { read_plan.prepare_to_read(loc, segment_pos, skip_bitmap); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 1f63893730d99c..07f77570ae72a8 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -438,6 +438,7 @@ Status BaseRowsetBuilder::_build_current_tablet_schema( RETURN_IF_ERROR(_partial_update_info->init( tablet()->tablet_id(), _req.txn_id, *_tablet_schema, table_schema_param->unique_key_update_mode(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 7a554bac0d9a96..9d62c3c284e03d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -36,6 +36,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -135,6 +136,7 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { public static final String KEY_SKIP_LINES = "skip_lines"; public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; public static final String KEY_COMMENT = "comment"; @@ -188,6 +190,12 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { return Boolean.valueOf(s); } }) + .put(PARTIAL_UPDATE_NEW_KEY_POLICY, new Function() { + @Override + public @Nullable TPartialUpdateNewRowPolicy apply(@Nullable String s) { + return TPartialUpdateNewRowPolicy.valueOf(s.toUpperCase()); + } + }) .put(TIMEZONE, new Function() { @Override public @Nullable String apply(@Nullable String s) { @@ -381,6 +389,16 @@ public static void checkProperties(Map properties) throws DdlExc } } + // partial update new key policy + final String partialUpdateNewKeyPolicyProperty = properties.get(PARTIAL_UPDATE_NEW_KEY_POLICY); + if (partialUpdateNewKeyPolicyProperty != null) { + if (!partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("append") + && !partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("error")) { + throw new DdlException(PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of [append, error], but found " + + partialUpdateNewKeyPolicyProperty); + } + } + // time zone final String timezone = properties.get(TIMEZONE); if (timezone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 4708b2bd936645..0c71a2cfe0e129 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -157,7 +157,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, Priority priority, boolean enableMemTableOnSinkNode, int batchSize, String clusterId) { super(userinfo, db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate, - txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance, - jobProfile, singleTabletLoadPerSink, priority, enableMemTableOnSinkNode, batchSize); + partialUpdateNewKeyPolicy, txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, + loadZeroTolerance, jobProfile, singleTabletLoadPerSink, priority, enableMemTableOnSinkNode, batchSize); this.cloudClusterId = clusterId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 117b32886cbe32..2fca473556522f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -260,7 +260,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, @@ -107,6 +110,7 @@ public LoadLoadingTask(UserIdentity userInfo, Database db, OlapTable table, this.execMemLimit = execMemLimit; this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.txnId = txnId; this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough @@ -129,8 +133,8 @@ public void init(TUniqueId loadId, List> fileStatusList, brokerFileGroups.add(fileGroup.toNereidsBrokerFileGroup()); } planner = new NereidsLoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, - brokerFileGroups, strictMode, isPartialUpdate, timezone, timeoutS, loadParallelism, - sendBatchParallelism, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode); + brokerFileGroups, strictMode, isPartialUpdate, partialUpdateNewKeyPolicy, timezone, timeoutS, + loadParallelism, sendBatchParallelism, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 8cf32648d55f05..f2b6821c8043cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -49,13 +50,14 @@ public class UnboundTableSink extends UnboundLogicalSin private final boolean temporaryPartition; private final List partitions; private boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; private final boolean autoDetectPartition; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, - false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); + this(nameParts, colNames, hints, false, partitions, false, + TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } /** @@ -63,9 +65,9 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -73,6 +75,7 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, boolean isAutoDetectPartition, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -91,6 +94,7 @@ public UnboundTableSink(List nameParts, List colNames, List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, Optional.empty(), + children.get(0)); } @Override @@ -158,14 +167,16 @@ public int hashCode() { @Override public Plan withGroupExpression(Optional groupExpression) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, + children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 2a118b4c4bcab5..a5dfa917fe390f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.ImmutableList; @@ -67,12 +68,13 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, LogicalPlan plan) { String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, @@ -92,7 +94,8 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSinkMaybeOverwrite(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType, + boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, LogicalPlan plan) { if (isAutoDetectPartition) { // partitions is null if (!isOverwrite) { @@ -107,7 +110,7 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isAutoDetectPartition, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 8881d477dd9f5d..534609484e8590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -501,6 +501,9 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink partialUpdateInputColumns; private Map exprMap; @@ -245,7 +247,7 @@ private String getHeaderType(String formatType) { * NereidsLoadPlanTranslator */ public NereidsLoadPlanInfoCollector(OlapTable destTable, NereidsLoadTaskInfo taskInfo, TUniqueId loadId, long dbId, - TUniqueKeyUpdateMode uniquekeyUpdateMode, + TUniqueKeyUpdateMode uniquekeyUpdateMode, TPartialUpdateNewRowPolicy uniquekeyUpdateNewRowPolicy, HashSet partialUpdateInputColumns, Map exprMap) { loadPlanInfo = new LoadPlanInfo(); @@ -254,6 +256,7 @@ public NereidsLoadPlanInfoCollector(OlapTable destTable, NereidsLoadTaskInfo tas this.loadId = loadId; this.dbId = dbId; this.uniquekeyUpdateMode = uniquekeyUpdateMode; + this.uniquekeyUpdateNewRowPolicy = uniquekeyUpdateNewRowPolicy; this.partialUpdateInputColumns = partialUpdateInputColumns; this.exprMap = exprMap; } @@ -322,7 +325,7 @@ public Void visitLogicalOlapTableSink(LogicalOlapTableSink logic loadPlanInfo.olapTableSink.init(loadId, taskInfo.getTxnId(), dbId, timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), timeout, uniquekeyUpdateMode, - partialUpdateInputColumns); + uniquekeyUpdateNewRowPolicy, partialUpdateInputColumns); } catch (UserException e) { throw new AnalysisException(e.getMessage(), e.getCause()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java index b5228649a631c5..127c4e65806b3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java @@ -24,6 +24,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; @@ -121,6 +122,10 @@ default boolean isFlexiblePartialUpdate() { return false; } + default TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return TPartialUpdateNewRowPolicy.APPEND; + } + default boolean getTrimDoubleQuotes() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java index 516914964a3a3d..a35ca5e1f17230 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java @@ -58,6 +58,7 @@ import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -108,7 +109,8 @@ public Void visitLogicalProject(LogicalProject logicalProject, L * create a load plan tree for stream load, routine load and broker load */ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, PartitionNames partitionNames, - NereidsParamCreateContext context, boolean isPartialUpdate) throws UserException { + NereidsParamCreateContext context, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy) throws UserException { // context.scanSlots represent columns read from external file // use LogicalOneRowRelation to hold this info for later use LogicalPlan currentRootPlan = new LogicalOneRowRelation(StatementScopeIdGenerator.newRelationId(), @@ -172,7 +174,7 @@ public static LogicalPlan createLoadPlan(NereidsFileGroupInfo fileGroupInfo, Par ImmutableList.of(), partitionNames != null && partitionNames.isTemp(), partitionNames != null ? partitionNames.getPartitionNames() : ImmutableList.of(), isPartialUpdate, - DMLCommandType.LOAD, currentRootPlan); + partialUpdateNewKeyPolicy, DMLCommandType.LOAD, currentRootPlan); CascadesContext cascadesContext = CascadesContext.initContext(new StatementContext(), currentRootPlan, PhysicalProperties.ANY); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java index 2dab09c108ce13..39b3c81d548a03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java @@ -37,6 +37,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUniqueKeyUpdateMode; @@ -65,6 +66,7 @@ public class NereidsLoadingTaskPlanner { private final List fileGroups; private final boolean strictMode; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final String timezone; private final long timeoutS; // timeout of load job, in second private final int loadParallelism; @@ -85,7 +87,8 @@ public class NereidsLoadingTaskPlanner { */ public NereidsLoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, + boolean strictMode, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, UserIdentity userInfo, boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) { this.loadJobId = loadJobId; @@ -96,6 +99,7 @@ public NereidsLoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTabl this.fileGroups = brokerFileGroups; this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.timezone = timezone; this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; @@ -147,7 +151,7 @@ public void plan(TUniqueId loadId, List> fileStatusesLis NereidsParamCreateContext context = loadScanProvider.createLoadContext(); PartitionNames partitionNames = getPartitionNames(); LogicalPlan streamLoadPlan = NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context, - isPartialUpdate); + isPartialUpdate, partialUpdateNewKeyPolicy); long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeoutS() : timeoutS; if (txnTimeout > Integer.MAX_VALUE) { txnTimeout = Integer.MAX_VALUE; @@ -157,7 +161,7 @@ public void plan(TUniqueId loadId, List> fileStatusesLis strictMode, enableMemtableOnSinkNode, partitionNames); NereidsLoadPlanInfoCollector planInfoCollector = new NereidsLoadPlanInfoCollector(table, nereidsBrokerLoadTask, loadId, dbId, isPartialUpdate ? TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT, - partialUpdateInputColumns, context.exprMap); + partialUpdateNewKeyPolicy, partialUpdateInputColumns, context.exprMap); NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(streamLoadPlan); descTable = loadPlanInfo.getDescriptorTable(); FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), loadPlanInfo.getDestTuple()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java index 0f775e65d1066b..2a017dbf075238 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TQueryGlobals; @@ -237,10 +238,13 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde NereidsLoadScanProvider loadScanProvider = new NereidsLoadScanProvider(fileGroupInfo, partialUpdateInputColumns); NereidsParamCreateContext context = loadScanProvider.createLoadContext(); + TPartialUpdateNewRowPolicy partialUpdateNewRowPolicy = taskInfo.getPartialUpdateNewRowPolicy(); LogicalPlan streamLoadPlan = NereidsLoadUtils.createLoadPlan(fileGroupInfo, dataDescription.getPartitionNames(), - context, uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + context, uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, + partialUpdateNewRowPolicy); NereidsLoadPlanInfoCollector planInfoCollector = new NereidsLoadPlanInfoCollector(destTable, taskInfo, loadId, - db.getId(), uniquekeyUpdateMode, partialUpdateInputColumns, context.exprMap); + db.getId(), uniquekeyUpdateMode, partialUpdateNewRowPolicy, partialUpdateInputColumns, + context.exprMap); NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(streamLoadPlan); FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), loadPlanInfo.getDestTuple()); fileScanNode.finalizeForNereids(loadId, Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java index a66b895c14c937..9e1ed849b5d328 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java @@ -31,6 +31,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUniqueKeyUpdateMode; @@ -80,6 +81,7 @@ public class NereidsStreamLoadTask implements NereidsLoadTaskInfo { private List hiddenColumns; private boolean trimDoubleQuotes = false; private TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private int skipLines = 0; private boolean enableProfile = false; @@ -310,6 +312,11 @@ public boolean isFlexiblePartialUpdate() { return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; } + @Override + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + @Override public boolean isMemtableOnSinkNode() { return memtableOnSinkNode; @@ -482,6 +489,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; } } + if (request.isSetPartialUpdateNewKeyPolicy()) { + partialUpdateNewKeyPolicy = request.getPartialUpdateNewKeyPolicy(); + } if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 89b6099752b0f9..24cdfd5a73566c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1241,6 +1241,7 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { isAutoDetect, isOverwrite, ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), + ConnectContext.get().getSessionVariable().getPartialUpdateNewRowPolicy(), ctx.tableId == null ? DMLCommandType.INSERT : DMLCommandType.GROUP_COMMIT, plan); Optional cte = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 7617977f00bfed..7d62cd0f0f9a04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -85,6 +85,7 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -138,6 +139,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { Database database = pair.first; OlapTable table = pair.second; boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy(); LogicalPlan child = ((LogicalPlan) sink.child()); boolean childHasSeqCol = child.getOutput().stream() @@ -161,6 +163,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()), isPartialUpdate, + partialUpdateNewKeyPolicy, sink.getDMLCommandType(), child); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index 710d73c9c63e0b..f745852b0ed01d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -42,6 +42,7 @@ public Rule build() { ctx.connectContext.getSessionVariable().isEnableMemtableOnSinkNode() ? false : ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDmlCommandType(), sink.getPartitionExprList(), sink.getSyncMvWhereClauses(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java index 146e03189a3771..34f122f0d6e82e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java @@ -81,6 +81,7 @@ import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -506,7 +507,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer logicalQuery = handleCte(logicalQuery); // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(), - isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); + isTempPart, partitions, isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.DELETE, logicalQuery); } protected LogicalPlan handleCte(LogicalPlan logicalPlan) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 78fe220fa23556..2a194feb95ccf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -63,6 +63,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.tablefunction.HdfsTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -242,7 +243,8 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), - false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); + false, dataDesc.getPartitionNames(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.LOAD, tvfLogicalPlan); } private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, OlapTable olapTable, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index ada4ca9035c47a..435dfd7eb589c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -193,7 +194,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(), - false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); + false, ImmutableList.of(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.UPDATE, logicalQuery); } private void checkAssignmentColumn(ConnectContext ctx, List columnNameParts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 84a1f4329841cf..7e7d8ec834ab8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -59,6 +59,7 @@ import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -322,6 +323,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis true, tempPartitionNames, sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); // 1. when overwrite table, allow auto partition or not is controlled by session variable. @@ -337,6 +339,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new HiveInsertCommandContext(); @@ -350,6 +353,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new IcebergInsertCommandContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 5022df67eb0a87..5677d5ae0af8db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -53,16 +54,18 @@ public class LogicalOlapTableSink extends LogicalTableS private final OlapTable targetTable; private final List partitionIds; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; private final List partitionExprList; private final Map syncMvWhereClauses; private final List targetTableSlots; public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, - List outputExprs, boolean isPartialUpdate, DMLCommandType dmlCommandType, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, dmlCommandType, - Optional.empty(), Optional.empty(), child); + List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, partialUpdateNewKeyPolicy, + dmlCommandType, Optional.empty(), Optional.empty(), child); } /** @@ -70,23 +73,25 @@ public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, new ArrayList<>(), new HashMap<>(), new ArrayList<>(), groupExpression, logicalProperties, child); } private LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean isPartialUpdate, - DMLCommandType dmlCommandType, List partitionExprList, - Map syncMvWhereClauses, List targetTableSlots, - Optional groupExpression, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, + List partitionExprList, Map syncMvWhereClauses, + List targetTableSlots, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.dmlCommandType = dmlCommandType; this.partitionIds = Utils.copyRequiredList(partitionIds); this.partitionExprList = partitionExprList; @@ -102,7 +107,7 @@ public Plan withChildAndUpdateOutput(Plan child, List partitionExprL List output = child.getOutput().stream().map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, Optional.empty(), Optional.empty(), child); } @@ -110,7 +115,7 @@ public Plan withChildAndUpdateOutput(Plan child, List partitionExprL public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, Optional.empty(), Optional.empty(), children.get(0)); } @@ -130,6 +135,10 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + public DMLCommandType getDmlCommandType() { return dmlCommandType; } @@ -148,7 +157,7 @@ public List getTargetTableSlots() { public LogicalOlapTableSink withOutputExprs(List outputExprs) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, Optional.empty(), Optional.empty(), child()); } @@ -165,6 +174,7 @@ public boolean equals(Object o) { } LogicalOlapTableSink that = (LogicalOlapTableSink) o; return isPartialUpdate == that.isPartialUpdate && dmlCommandType == that.dmlCommandType + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) && Objects.equals(partitionIds, that.partitionIds); @@ -173,7 +183,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -185,6 +195,7 @@ public String toString() { "cols", cols, "partitionIds", partitionIds, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -197,7 +208,7 @@ public R accept(PlanVisitor visitor, C context) { @Override public Plan withGroupExpression(Optional groupExpression) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, Optional.of(getLogicalProperties()), child()); } @@ -205,14 +216,14 @@ public Plan withGroupExpression(Optional groupExpression) { public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, logicalProperties, children.get(0)); } public Plan withPartitionExprAndMvWhereClause(List partitionExprList, Map syncMvWhereClauses) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, Optional.empty(), Optional.empty(), child()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index e89e3385dbff10..9ddd5ed547cb30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -59,6 +60,7 @@ public class PhysicalOlapTableSink extends PhysicalTabl private final List partitionIds; private final boolean singleReplicaLoad; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; private final List partitionExprList; private final Map syncMvWhereClauses; @@ -69,12 +71,12 @@ public class PhysicalOlapTableSink extends PhysicalTabl */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - List partitionExprList, Map syncMvWhereClauses, - List targetTableSlots, Optional groupExpression, - LogicalProperties logicalProperties, CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, List partitionExprList, + Map syncMvWhereClauses, List targetTableSlots, + Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { this(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, partitionExprList, syncMvWhereClauses, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } @@ -84,9 +86,10 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - List partitionExprList, Map syncMvWhereClauses, - List targetTableSlots, Optional groupExpression, + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, List partitionExprList, + Map syncMvWhereClauses, List targetTableSlots, + Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, @@ -97,6 +100,7 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List getTargetTableSlots() { public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, partitionExprList, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -164,6 +172,7 @@ public boolean equals(Object o) { PhysicalOlapTableSink that = (PhysicalOlapTableSink) o; return singleReplicaLoad == that.singleReplicaLoad && isPartialUpdate == that.isPartialUpdate + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) @@ -174,7 +183,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -187,6 +196,7 @@ public String toString() { "partitionIds", partitionIds, "singleReplicaLoad", singleReplicaLoad, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -204,7 +214,7 @@ public List getExpressions() { @Override public Plan withGroupExpression(Optional groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, partitionExprList, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, getLogicalProperties(), child()); } @@ -213,7 +223,7 @@ syncMvWhereClauses, targetTableSlots, groupExpression, getLogicalProperties(), public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, partitionExprList, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, logicalProperties.get(), children.get(0)); } @@ -222,7 +232,7 @@ public Plan withGroupExprLogicalPropChildren(Optional groupExpr public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, partitionExprList, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -257,7 +267,7 @@ public PhysicalProperties getRequirePhysicalProperties() { @Override public PhysicalOlapTableSink resetLogicalProperties() { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, partitionExprList, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, partitionExprList, syncMvWhereClauses, targetTableSlots, groupExpression, null, physicalProperties, statistics, child()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index d124dfc4d17fa5..36c65ae121e597 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -74,6 +74,7 @@ import org.apache.doris.thrift.TOlapTableSchemaParam; import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPaloNodesInfo; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; @@ -110,6 +111,7 @@ public class OlapTableSink extends DataSink { // partial update input columns private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; private HashSet partialUpdateInputColumns; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; // set after init called protected TDataSink tDataSink; @@ -246,8 +248,12 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, int sendBatchParallelism, boolean loadToSingleTablet, boolean isStrictMode, long txnExpirationS, TUniqueKeyUpdateMode uniquekeyUpdateMode, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, HashSet partialUpdateInputColumns) throws UserException { setPartialUpdateInfo(uniquekeyUpdateMode, partialUpdateInputColumns); + if (uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) { + setPartialUpdateNewRowPolicy(partialUpdateNewKeyPolicy); + } init(loadId, txnId, dbId, loadChannelTimeoutS, sendBatchParallelism, loadToSingleTablet, isStrictMode, txnExpirationS); for (Long partitionId : partitionIds) { @@ -306,6 +312,10 @@ public void setPartialUpdateInfo(TUniqueKeyUpdateMode uniqueKeyUpdateMode, HashS } } + public void setPartialUpdateNewRowPolicy(TPartialUpdateNewRowPolicy policy) { + this.partialUpdateNewKeyPolicy = policy; + } + public void updateLoadId(TUniqueId newLoadId) { tDataSink.getOlapTableSink().setLoadId(newLoadId); } @@ -364,11 +374,13 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { boolean isPartialUpdate = uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT; strBuilder.append(prefix + " IS_PARTIAL_UPDATE: " + isPartialUpdate); if (isPartialUpdate) { + strBuilder.append("\n"); if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: UPDATE_FIXED_COLUMNS"); } else { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: UPDATE_FLEXIBLE_COLUMNS"); } + strBuilder.append("\n" + prefix + " PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: " + partialUpdateNewKeyPolicy); } return strBuilder.toString(); } @@ -443,33 +455,7 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a indexSchema.setIndexesDesc(indexDesc); schemaParam.addToIndexes(indexSchema); } - // for backward compatibility - schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); - schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode); - if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) { - if (table.getState() == OlapTable.OlapTableState.ROLLUP - || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) { - throw new AnalysisException("Can't do partial update when table is doing schema change."); - } - - } - if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) { - Column seqMapCol = table.getFullSchema().stream() - .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) - .findFirst().get(); - schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId()); - } - if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { - for (String s : partialUpdateInputColumns) { - schemaParam.addToPartialUpdateInputColumns(s); - } - for (Column col : table.getFullSchema()) { - if (col.isAutoInc()) { - schemaParam.setAutoIncrementColumn(col.getName()); - schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId()); - } - } - } + setPartialUpdateInfoForParam(schemaParam, table, uniqueKeyUpdateMode); schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); return schemaParam; } @@ -527,6 +513,13 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) throws An indexSchema.setIndexesDesc(indexDesc); schemaParam.addToIndexes(indexSchema); } + setPartialUpdateInfoForParam(schemaParam, table, uniqueKeyUpdateMode); + schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); + return schemaParam; + } + + private void setPartialUpdateInfoForParam(TOlapTableSchemaParam schemaParam, OlapTable table, + TUniqueKeyUpdateMode uniqueKeyUpdateMode) throws AnalysisException { // for backward compatibility schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode); @@ -535,6 +528,7 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) throws An || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) { throw new AnalysisException("Can't do partial update when table is doing schema change."); } + schemaParam.setPartialUpdateNewKeyPolicy(partialUpdateNewKeyPolicy); } if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) { Column seqMapCol = table.getFullSchema().stream() @@ -553,8 +547,6 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) throws An } } } - schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat()); - return schemaParam; } private List getDistColumns(DistributionInfo distInfo) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 9f8e9bd69172f3..aab8479f472398 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -41,6 +41,7 @@ import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TGroupCommitMode; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -538,6 +539,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update"; + public static final String PARTIAL_UPDATE_NEW_KEY_BEHAVIOR = "partial_update_new_key_behavior"; + public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold"; public static final String INVERTED_INDEX_MAX_EXPANSIONS = "inverted_index_max_expansions"; @@ -2073,6 +2076,12 @@ public boolean isEnableHboNonStrictMatchingMode() { @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true) public boolean enableUniqueKeyPartialUpdate = false; + @VariableMgr.VarAttr(name = PARTIAL_UPDATE_NEW_KEY_BEHAVIOR, needForward = true, description = { + "用于设置部分列更新中对于新插入的行的行为", + "Used to set the behavior for newly inserted rows in partial update." + }, checker = "checkPartialUpdateNewKeyBehavior", options = {"APPEND", "ERROR"}) + public String partialUpdateNewKeyPolicy = "APPEND"; + @VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = { "用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错", "Used to test whether the query cache is hit. " @@ -4056,6 +4065,10 @@ public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyPolicy); + } + public int getLoadStreamPerNode() { return loadStreamPerNode; } @@ -4592,6 +4605,29 @@ public void checkGenerateStatsFactor(String generateStatsFactor) { } } + public TPartialUpdateNewRowPolicy parsePartialUpdateNewKeyBehavior(String behavior) { + if (behavior == null) { + return null; + } else if (behavior.equalsIgnoreCase("APPEND")) { + return TPartialUpdateNewRowPolicy.APPEND; + } else if (behavior.equalsIgnoreCase("ERROR")) { + return TPartialUpdateNewRowPolicy.ERROR; + } + return null; + } + + public void checkPartialUpdateNewKeyBehavior(String partialUpdateNewKeyBehavior) { + TPartialUpdateNewRowPolicy policy = parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyBehavior); + if (policy == null) { + UnsupportedOperationException exception = + new UnsupportedOperationException(PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + + " should be one of {'APPEND', 'ERROR'}, but found " + + partialUpdateNewKeyBehavior); + LOG.warn("Check " + PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + " failed", exception); + throw exception; + } + } + public void setGenerateStatsFactor(int factor) { this.generateStatsFactor = factor; if (factor <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index cf48aad9772f67..502dd8672eaa84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; @@ -119,6 +120,10 @@ default boolean isFlexiblePartialUpdate() { return false; } + default TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return TPartialUpdateNewRowPolicy.APPEND; + } + default boolean getTrimDoubleQuotes() { return false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java index cef9bac4ed90a7..f3d718e9641424 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Test; @@ -99,7 +100,7 @@ void testEliminateSortUnderTableSink() { .limit(1, 1).build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .disableNereidsRules("PRUNE_EMPTY_PARTITION") .rewrite() @@ -113,7 +114,7 @@ void testEliminateSortUnderTableSink() { .build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .rewrite() .nonMatch(logicalSort()); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 918b97fc539c72..31008c9072545b 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -76,5 +76,6 @@ message POlapTableSchemaParam { optional int32 nano_seconds = 14 [default = 0]; optional UniqueKeyUpdateModePB unique_key_update_mode = 15 [default = UPSERT]; optional int32 sequence_map_col_unique_id = 16 [default = -1]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 17 [default = APPEND]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index f47ce7a8a801bb..35f5625b0bec09 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -640,6 +640,11 @@ message RowsetBinlogMetasPB { repeated RowsetBinlogMetaPB rowset_binlog_metas = 1; } +enum PartialUpdateNewRowPolicyPB { + APPEND = 0; + ERROR = 1; +} + message PartialUpdateInfoPB { optional bool is_partial_update = 1 [default = false]; // deprecated repeated string partial_update_input_columns = 2; @@ -655,4 +660,5 @@ message PartialUpdateInfoPB { optional int64 max_version_in_flush_phase = 12 [default = -1]; optional int32 nano_seconds = 13 [default = 0]; optional UniqueKeyUpdateModePB partial_update_mode = 14 [default = UPSERT]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 15 [default = APPEND]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 1e6b178ff962e8..4c85da61006215 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -162,6 +162,11 @@ enum TIndexType { NGRAM_BF = 3 } +enum TPartialUpdateNewRowPolicy { + APPEND = 0, + ERROR = 1 +} + // Mapping from names defined by Avro to the enum. // We permit gzip and bzip2 in addition. const map COMPRESSION_MAP = { @@ -262,6 +267,7 @@ struct TOlapTableSchemaParam { 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode 15: optional i32 sequence_map_col_unique_id = -1 + 16: optional TPartialUpdateNewRowPolicy partial_update_new_key_policy } struct TTabletLocation { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 20a6131eb3187b..373b0b80cda007 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -554,6 +554,7 @@ struct TStreamLoadPutRequest { 55: optional i32 stream_per_node; 56: optional string group_commit_mode 57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + 58: optional Descriptors.TPartialUpdateNewRowPolicy partial_update_new_key_policy // For cloud 1000: optional string cloud_cluster diff --git a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out index 4585bd5c5834dd..6ad32390d03d36 100644 --- a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out +++ b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out @@ -1,9 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !select -- -1 kevin 18 shenzhen 500 2023-07-03T12:00:01 -3 \N \N \N 23 2023-07-03T12:00:02 -18 \N \N \N 9999999 2023-07-03T12:00:03 - -- !select -- 3 Jack \N 90 4 Mike 3 \N diff --git a/regression-test/data/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.out b/regression-test/data/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.out index 93fa33d81f9e96..18fa75df60991e 100644 --- a/regression-test/data/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.out +++ b/regression-test/data/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.out @@ -4,12 +4,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -23,7 +27,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -38,12 +44,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -57,7 +67,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 diff --git a/regression-test/data/unique_with_mow_p0/flexible/test_f_new_key_policy.out b/regression-test/data/unique_with_mow_p0/flexible/test_f_new_key_policy.out new file mode 100644 index 00000000000000..5d15ce88ec5091 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/flexible/test_f_new_key_policy.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 + +-- !stream_load_append -- +0 0 0 0 +1 999 1 1 +2 2 888 2 +3 3 3 3 +4 4 4 4 +10 888 \N 7777 +12 \N 888 7777 + +-- !stream_load_error -- +0 0 0 0 +1 999 1 1 +2 2 888 2 +3 3 3 3 +4 4 4 4 +10 888 \N 7777 +12 \N 888 7777 + +-- !stream_ignore_property -- +0 0 0 0 +1 999 1 1 +2 2 888 2 +3 999 \N \N +4 \N 888 \N +10 888 \N 7777 +12 \N 888 7777 +20 888 \N 7777 +22 \N 888 7777 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv new file mode 100644 index 00000000000000..49517f38a3b070 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv @@ -0,0 +1,4 @@ +0,40 +1,40 +10,40 +11,40 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv new file mode 100644 index 00000000000000..3f614863e4f57c --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv @@ -0,0 +1,4 @@ +2,50 +3,50 +13,50 +14,50 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv new file mode 100644 index 00000000000000..7fa3c1ae4e2bcd --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv @@ -0,0 +1,4 @@ +4,60,60,60 +5,60,60,60 +21,60,60,60 +22,60,60,60 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out new file mode 100644 index 00000000000000..499f7f4d1ffcc9 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out @@ -0,0 +1,153 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !insert_append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert3 -- +0 10 0 0 +1 9 9 9 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +100 9 9 9 +200 9 9 9 + +-- !stream_load_append -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_error -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_ignore_property -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 +100 9 9 9 +200 9 9 9 + +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !broker_load_append -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_error -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_ignore_property -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 + +-- !sql -- +0 \N 30 456 +1 20 123 456 +2 \N 30 456 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 \N 30 456 +0 0 0 0 +0 20 123 456 +1 1 1 1 +1 20 123 456 +2 \N 30 456 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +10 20 123 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 20 0 0 +1 20 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 20 30 \N +11 20 \N \N +12 \N 30 \N + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out new file mode 100644 index 00000000000000..9c95e5ebde7c1e --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out @@ -0,0 +1,78 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !ignore -- +0 10 0 0 +1 1 20 1 +2 2 2 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !append -- +0 10 0 40 +1 1 30 40 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !ignore -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !error1 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 40 +11 \N \N 40 + +-- !error2 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 70 +11 \N \N 40 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out index 93fa33d81f9e96..18fa75df60991e 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out @@ -4,12 +4,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -23,7 +27,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -38,12 +44,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -57,7 +67,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 diff --git a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy index d8057f374afd01..ccc46f59af946f 100644 --- a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy +++ b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy @@ -19,65 +19,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_error_data_handling", "p0") { def tableName = "test_error_data_handling" - // partial update strict mode - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE ${tableName} ( - id INT, - name VARCHAR(10), - age INT, - city VARCHAR(10), - balance DECIMAL(9, 0), - last_access_time DATETIME - ) - UNIQUE KEY(id) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES("replication_num" = "1") - """ - - sql """ - INSERT INTO ${tableName} VALUES (1, 'kevin', 18, 'shenzhen', 400, '2023-07-01 12:00:00') - """ - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'true' - time 10000 - check {result, exception, startTime, endTime -> - assertTrue(exception == null) - def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) - assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) - } - } - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'false' - time 10000 - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - } - } - - qt_select "SELECT * FROM ${tableName} ORDER BY id" - sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ diff --git a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy index 6a54d6a5e71702..f2ff40649d24de 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy @@ -179,13 +179,12 @@ suite("nereids_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync;" - // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_light_schema_change.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_light_schema_change.groovy index 135c18f4fc7141..3860729a41c647 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_light_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_light_schema_change.groovy @@ -275,15 +275,17 @@ suite("test_partial_update_insert_light_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it don't have any value columns + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" + test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - // exception "INTERNAL_ERROR" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" - sql "set enable_unique_key_partial_update=false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync" qt_add_key_col_2 " select * from ${tableName} order by c0; " diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_schema_change.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_schema_change.groovy index 62140ac58bf1cf..dba26c47c09c94 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_insert_schema_change.groovy @@ -265,13 +265,16 @@ suite("test_partial_update_insert_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it don't have any value columns + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - // exception "INTERNAL_ERROR" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" sql "set enable_unique_key_partial_update=false;" sql "sync" diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change.groovy index d2e27699524635..26f66e9f88c9bb 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change.groovy @@ -435,7 +435,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -448,9 +448,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1032,7 +1030,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1045,9 +1043,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change_row_store.groovy index cffb682488a2d3..3e1cc86e0f6b21 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -433,13 +433,13 @@ suite("test_partial_update_row_store_schema_change", "p0") { }); // test load data with all key column, should fail because - // it don't have any value columns + // it inserts a new row when partial_update_new_key_behavior=ERROR streamLoad { table "${tableName}" set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -452,9 +452,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1045,7 +1043,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1058,9 +1056,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.groovy index 3cf0ef814e9998..cf86a0a36fd1e5 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_strict_mode.groovy @@ -21,6 +21,9 @@ suite("test_partial_update_strict_mode", "p0") { return } + // NOTE: after https://github.com/apache/doris/pull/41232, the handling of newly inserted rows in partial update + // is not controlled by strict mode + String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database @@ -70,8 +73,8 @@ suite("test_partial_update_strict_mode", "p0") { def json = parseJson(result) assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -117,11 +120,10 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -214,7 +216,7 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) + assertEquals("Success", json.Status) } } @@ -256,7 +258,7 @@ suite("test_partial_update_strict_mode", "p0") { (1,600,"2023-08-03 12:00:01"), (2,500,"2023-07-03 12:00:01"), (4,23,"2023-07-03 12:00:02");""" - exception "Insert has filtered data in strict mode" + exception "[E-207]the unmentioned column `city` should have default value or be nullable for newly inserted rows in non-strict mode partial update" } sql "sync;" qt_sql """select * from ${tableName5} order by id;""" diff --git a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_upsert.groovy b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_upsert.groovy index b6b72287c3f8ab..747f09bf2a6161 100644 --- a/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_upsert.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/partial_update/test_partial_update_upsert.groovy @@ -42,7 +42,7 @@ suite("test_partial_update_upsert", "p0") { `last_access_time` datetime NULL ) ENGINE = OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) - BUCKETS AUTO PROPERTIES ( + BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "storage_format" = "V2", "enable_unique_key_merge_on_write" = "true", @@ -98,7 +98,7 @@ suite("test_partial_update_upsert", "p0") { set 'format', 'csv' set 'partial_columns', 'true' set 'columns', 'id,balance,last_access_time' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' file 'upsert.csv' time 10000 // limit inflight 10s @@ -107,6 +107,7 @@ suite("test_partial_update_upsert", "p0") { assertTrue(exception == null) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } sql "sync" diff --git a/regression-test/suites/unique_with_mow_p0/flexible/test_f_new_key_policy.groovy b/regression-test/suites/unique_with_mow_p0/flexible/test_f_new_key_policy.groovy new file mode 100644 index 00000000000000..cd8be4811f19e5 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/flexible/test_f_new_key_policy.groovy @@ -0,0 +1,95 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_f_new_key_policy", "p0") { + + def tableName = "test_f_new_key_policy" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "enable_unique_key_skip_bitmap_column" = "true"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + qt_sql """select * from ${tableName} order by k;""" + + def checkVariable = { expected -> + def res = sql_return_maparray """show variables where Variable_name="partial_update_new_key_behavior";"""; + logger.info("res: ${res}") + assertTrue(res[0].VARIABLE_VALUE.equalsIgnoreCase(expected)); + } + + // 2. test stream load + // 2.1 + String load1 = """{"k":1,"c1":999} + {"k":2,"c2":888} + {"k":10,"c1":888,"c3":7777} + {"k":12,"c2":888,"c3":7777}""" + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' + set 'partial_update_new_key_behavior', 'append' + inputStream new ByteArrayInputStream(load1.getBytes()) + time 10000 + } + qt_stream_load_append """select * from ${tableName} order by k;""" + + String load2 = """{"k":3,"c1":999} + {"k":4,"c2":888} + {"k":20,"c1":888,"c3":7777} + {"k":22,"c2":888,"c3":7777}""" + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' + set 'partial_update_new_key_behavior', 'error' + inputStream new ByteArrayInputStream(load2.getBytes()) + time 10000 + check {result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[20] is not in table.")) + } + } + qt_stream_load_error """select * from ${tableName} order by k;""" + + + String load3 = """{"k":3,"c1":999} + {"k":4,"c2":888} + {"k":20,"c1":888,"c3":7777} + {"k":22,"c2":888,"c3":7777}""" + // 2.2 partial_update_new_key_behavior will not take effect when it's not a partial update + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'partial_update_new_key_behavior', 'error' + inputStream new ByteArrayInputStream(load3.getBytes()) + time 10000 + } + qt_stream_ignore_property """select * from ${tableName} order by k;""" +} diff --git a/regression-test/suites/unique_with_mow_p0/flexible/test_flexible_partial_update_filter_ratio.groovy b/regression-test/suites/unique_with_mow_p0/flexible/test_flexible_partial_update_filter_ratio.groovy index d262d145b5a8aa..8f85f3cdcb3211 100644 --- a/regression-test/suites/unique_with_mow_p0/flexible/test_flexible_partial_update_filter_ratio.groovy +++ b/regression-test/suites/unique_with_mow_p0/flexible/test_flexible_partial_update_filter_ratio.groovy @@ -89,6 +89,7 @@ suite('test_flexible_partial_update_filter_ratio') { set 'format', 'json' set 'read_json_by_line', 'true' set 'strict_mode', 'true' + // strict mode will affect the behavior of newly inserted rows set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' file "key_missing.json" time 20000 @@ -100,8 +101,7 @@ suite('test_flexible_partial_update_filter_ratio') { assertEquals("fail", json.Status.toLowerCase()) assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) assertEquals(5, json.NumberTotalRows) - // newly inserted rows will be counted into filtered rows - assertEquals(4, json.NumberFilteredRows) + assertEquals(3, json.NumberFilteredRows) assertEquals(0, json.NumberLoadedRows) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy index d92dc41a68bc34..ac98141b121245 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy @@ -264,14 +264,16 @@ suite("test_partial_update_insert_light_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" - sql "set enable_unique_key_partial_update=false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync" qt_add_key_col_2 " select * from ${tableName} order by c0; " diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy index 9e0abc9704c482..852fad19737336 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy @@ -249,12 +249,16 @@ suite("test_partial_update_insert_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" sql "set enable_unique_key_partial_update=false;" sql "sync" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy index c3e25a3a690c44..87eb39a3b5b567 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy @@ -175,17 +175,17 @@ suite("test_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "set enable_unique_key_partial_update=true;" sql "sync;" // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" - sql "set enable_insert_strict = false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync;" sql """ DROP TABLE IF EXISTS ${tableName5}; """ diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy new file mode 100644 index 00000000000000..c425650c2917dd --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy @@ -0,0 +1,267 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_new_key_policy", "p0") { + + def tableName = "test_partial_update_new_key_policy" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="3");""" + qt_sql """select * from ${tableName} order by k;""" + + def checkVariable = { expected -> + def res = sql_return_maparray """show variables where Variable_name="partial_update_new_key_behavior";"""; + logger.info("res: ${res}") + assertTrue(res[0].Value.equalsIgnoreCase(expected)); + } + + // 1. test insert stmt + // 1.1 + sql "set enable_unique_key_partial_update=true;" + sql "sync" + + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" + checkVariable("APPEND") + explain { + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: APPEND" + } + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + qt_insert_append """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" + checkVariable("ERROR") + explain { + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: ERROR" + } + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + qt_insert_error1 """select * from ${tableName} order by k;""" + test { + sql "insert into ${tableName}(k,c2) values(1,30),(10,999),(11,999);" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[10] is not in table." + } + qt_insert_error2 """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior=default;""" + sql "sync;" + checkVariable("APPEND") + test { + sql """set partial_update_new_key_behavior="invalid";""" + exception "partial_update_new_key_behavior should be one of {'APPEND', 'ERROR'}, but found invalid" + } + checkVariable("APPEND") + + // 1.2 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + sql "set partial_update_new_key_behavior=ERROR;" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + + sql "insert into ${tableName} values(1,9,9,9),(2,9,9,9),(100,9,9,9),(200,9,9,9);" + qt_insert3 """select * from ${tableName} order by k;""" + + + // 2. test stream load + // 2.1 APPEND + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'append' + file 'row_policy1.csv' + time 10000 + } + qt_stream_load_append """select * from ${tableName} order by k;""" + // 2.2 ERROR + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy2.csv' + time 10000 + check {result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.")) + } + } + qt_stream_load_error """select * from ${tableName} order by k;""" + // 2.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c1,c2,c3' + set 'partial_columns', 'false' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy3.csv' + time 10000 + } + qt_stream_load_ignore_property """select * from ${tableName} order by k;""" + + + // 3. test broker load + tableName = "test_partial_update_new_key_policy2" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql "insert into ${tableName} select number,number,number,number from numbers(\"number\"=\"6\");" + qt_sql """select * from ${tableName} order by k;""" + + // 3.1 APPEND + def label = "test_pu_new_key_policy1" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy1.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "APPEND" + ); + """ + waitForBrokerLoadDone(label) + def res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_append """select * from ${tableName} order by k;""" + + // 3.2 ERROR + label = "test_pu_new_key_policy2" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy2.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label, 600) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "CANCELLED" && res[0].ErrorMsg.contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.") + qt_broker_load_error """select * from ${tableName} order by k;""" + + // 3.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + label = "test_pu_new_key_policy3" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy3.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c1,c2,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_ignore_property """select * from ${tableName} order by k;""" + + + // 4. test this config will not affect non MOW tables + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" + + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k,c1,c2,c3;""" + + tableName = "test_partial_update_new_key_policy4" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int max, + `c2` int min, + `c3` int sum) + AGGREGATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy index 722f1fccb7643c..fac17ac542bc16 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy @@ -42,19 +42,18 @@ suite("test_partial_update_only_keys", "p0") { qt_sql """select * from ${tableName} order by k;""" // new rows will be appended sql "set enable_unique_key_partial_update=true;" - sql "set enable_insert_strict=false;" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" - // fail if has new rows - sql "set enable_insert_strict=true;" + // fail if has new rows when partial_update_new_key_behavior=ERROR + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" test { sql "insert into ${tableName}(k) values(0),(1),(10),(11);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_sql """select * from ${tableName} order by k;""" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 864a97e13bfdc5..ddd2436f938234 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -421,7 +421,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -434,9 +434,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1002,7 +1000,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1015,9 +1013,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy index 2ce042f8b6391c..22d396124924e7 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -419,13 +419,13 @@ suite("test_partial_update_row_store_schema_change", "p0") { }); // test load data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR streamLoad { table "${tableName}" set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -438,9 +438,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1015,7 +1013,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1028,9 +1026,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy index c4d26baee8f481..dc591d7b02a6c4 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy @@ -18,6 +18,9 @@ suite("test_partial_update_strict_mode", "p0") { + // NOTE: after https://github.com/apache/doris/pull/41232, the handling of newly inserted rows in partial update + // is not controlled by strict mode + String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database @@ -67,8 +70,8 @@ suite("test_partial_update_strict_mode", "p0") { def json = parseJson(result) assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -114,11 +117,10 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -211,7 +213,7 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) + assertEquals("Success", json.Status) } } @@ -253,7 +255,7 @@ suite("test_partial_update_strict_mode", "p0") { (1,600,"2023-08-03 12:00:01"), (2,500,"2023-07-03 12:00:01"), (4,23,"2023-07-03 12:00:02");""" - exception "Insert has filtered data in strict mode" + exception "[E-207]the unmentioned column `city` should have default value or be nullable for newly inserted rows in non-strict mode partial update" } sql "sync;" qt_sql """select * from ${tableName5} order by id;""" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy index ad2e4b2f4ff229..ba12be2b99ae2d 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy @@ -78,7 +78,7 @@ suite("test_partial_update_upsert", "p0") { `last_access_time` datetime NULL ) ENGINE = OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) - BUCKETS AUTO PROPERTIES ( + BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "storage_format" = "V2", "enable_unique_key_merge_on_write" = "true", @@ -95,7 +95,7 @@ suite("test_partial_update_upsert", "p0") { set 'format', 'csv' set 'partial_columns', 'true' set 'columns', 'id,balance,last_access_time' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' file 'upsert.csv' time 10000 // limit inflight 10s @@ -104,6 +104,7 @@ suite("test_partial_update_upsert", "p0") { assertTrue(exception == null) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } sql "sync"