diff --git a/be/src/common/status.h b/be/src/common/status.h index 26877e88dc79f9..68035e91e74835 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -293,6 +293,7 @@ namespace ErrorCode { E(KEY_NOT_FOUND, -7000, false); \ E(KEY_ALREADY_EXISTS, -7001, false); \ E(ENTRY_NOT_FOUND, -7002, false); \ + E(NEW_ROWS_IN_PARTIAL_UPDATE, -7003, false); \ E(INVALID_TABLET_STATE, -7211, false); \ E(ROWSETS_EXPIRED, -7311, false); \ E(CGROUP_ERROR, -7411, false); diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 87c0a824951446..4e3c4991a0aacf 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -130,6 +130,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { } _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } + if (_is_partial_update) { + if (pschema.has_partial_update_new_key_policy()) { + _partial_update_new_row_policy = pschema.partial_update_new_key_policy(); + } + } _timestamp_ms = pschema.timestamp_ms(); if (pschema.has_nano_seconds()) { _nano_seconds = pschema.nano_seconds(); @@ -221,6 +226,27 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } + if (_is_partial_update) { + if (tschema.__isset.partial_update_new_key_policy) { + switch (tschema.partial_update_new_key_policy) { + case doris::TPartialUpdateNewRowPolicy::APPEND: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND; + break; + } + case doris::TPartialUpdateNewRowPolicy::ERROR: { + _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR; + break; + } + default: { + return Status::InvalidArgument( + "Unknown partial_update_new_key_behavior: {}, should be one of " + "'APPEND' or 'ERROR'", + tschema.partial_update_new_key_policy); + } + } + } + } + for (const auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); } @@ -305,6 +331,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_table_id(_table_id); pschema->set_version(_version); pschema->set_partial_update(_is_partial_update); + pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index ff1c2e8e6b072e..bcfa4e7be1ade5 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -92,6 +92,9 @@ class OlapTableSchemaParam { std::set partial_update_input_columns() const { return _partial_update_input_columns; } + PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const { + return _partial_update_new_row_policy; + } std::string auto_increment_coulumn() const { return _auto_increment_column; } int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } @@ -113,6 +116,8 @@ class OlapTableSchemaParam { std::vector _indexes; mutable ObjectPool _obj_pool; bool _is_partial_update = false; + PartialUpdateNewRowPolicyPB _partial_update_new_row_policy { + PartialUpdateNewRowPolicyPB::APPEND}; std::set _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e3abd5a8a5d807..d82ac8eb2b8a52 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -639,6 +639,25 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_partial_update(false); } } + + if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) { + static const std::map policy_map { + {"APPEND", TPartialUpdateNewRowPolicy::APPEND}, + {"ERROR", TPartialUpdateNewRowPolicy::ERROR}}; + + auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY); + std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(), + [](unsigned char c) { return std::toupper(c); }); + auto it = policy_map.find(policy_name); + if (it == policy_map.end()) { + return Status::InvalidArgument( + "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', " + "'ERROR'}", + policy_name); + } + request.__set_partial_update_new_key_policy(it->second); + } + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); @@ -705,7 +724,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } ctx->put_result.params.__set_content_length(content_length); } - VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); // if we not use streaming, we must download total content before we begin // to process this load diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index b9b5f0d85ae302..fba5bba98d5ef9 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -59,6 +59,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines"; static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; +static const std::string HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY = "partial_update_new_key_behavior"; static const std::string HTTP_SQL = "sql"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 657ec265e50908..390ec6f1360e3e 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -238,6 +238,7 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info = std::make_shared(); RETURN_IF_ERROR(_partial_update_info->init( _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index 94dd4673f59850..9a852a28f20ffb 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -34,11 +34,13 @@ namespace doris { Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - bool partial_update, const std::set& partial_update_cols, + bool partial_update, PartialUpdateNewRowPolicyPB policy, + const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version) { is_partial_update = partial_update; + partial_update_new_key_policy = policy; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; @@ -86,6 +88,7 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { partial_update_info_pb->set_is_partial_update(is_partial_update); + partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy); partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase); for (const auto& col : partial_update_input_columns) { partial_update_info_pb->add_partial_update_input_columns(col); @@ -113,6 +116,9 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { is_partial_update = partial_update_info_pb->is_partial_update(); + if (partial_update_info_pb->has_partial_update_new_key_policy()) { + partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy(); + } max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase() ? partial_update_info_pb->max_version_in_flush_phase() : -1; @@ -152,23 +158,34 @@ std::string PartialUpdateInfo::summary() const { update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase); } -Status PartialUpdateInfo::handle_non_strict_mode_not_found_error( - const TabletSchema& tablet_schema) { - if (!can_insert_new_rows_in_partial_update) { - std::string error_column; - for (auto cid : missing_cids) { - const TabletColumn& col = tablet_schema.column(cid); - if (!col.has_default_value() && !col.is_nullable() && - !(tablet_schema.auto_increment_column() == col.name())) { - error_column = col.name(); - break; +Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, + const std::function& line) { + switch (partial_update_new_key_policy) { + case doris::PartialUpdateNewRowPolicyPB::APPEND: { + if (is_partial_update) { + if (!can_insert_new_rows_in_partial_update) { + std::string error_column; + for (auto cid : missing_cids) { + const TabletColumn& col = tablet_schema.column(cid); + if (!col.has_default_value() && !col.is_nullable() && + !(tablet_schema.auto_increment_column() == col.name())) { + error_column = col.name(); + break; + } + } + return Status::Error( + "the unmentioned column `{}` should have default value or be nullable " + "for newly inserted rows in non-strict mode partial update", + error_column); } } - return Status::Error( - "the unmentioned column `{}` should have default value or be nullable " - "for " - "newly inserted rows in non-strict mode partial update", - error_column); + } break; + case doris::PartialUpdateNewRowPolicyPB::ERROR: { + return Status::Error( + "Can't append new rows in partial update when partial_update_new_key_behavior is " + "ERROR. Row with key=[{}] is not in table.", + line()); + } break; } return Status::OK(); } diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 05372154bd2d07..93915431606d39 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -38,13 +39,15 @@ struct RowsetId; struct PartialUpdateInfo { Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, - bool partial_update, const std::set& partial_update_cols, - bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, + bool partial_update, PartialUpdateNewRowPolicyPB policy, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); - Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema); + Status handle_new_key(const TabletSchema& tablet_schema, + const std::function& line); std::string summary() const; private: @@ -52,6 +55,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND}; int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index be6c1dc45b8c81..97145449a6dc7e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -489,7 +489,7 @@ Status SegmentWriter::probe_key_for_mow( PartialUpdateReadPlan& read_plan, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, - PartialUpdateStats& stats) { + const std::function& not_found_cb, PartialUpdateStats& stats) { RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; @@ -497,16 +497,8 @@ Status SegmentWriter::probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { - RETURN_IF_ERROR( - _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema)); + if (!have_delete_sign) { + RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; has_default_or_nullable = true; @@ -678,10 +670,16 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool have_delete_sign = (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); - RETURN_IF_ERROR(probe_key_for_mow(key, segment_pos, have_input_seq_column, have_delete_sign, - read_plan, specified_rowsets, segment_caches, - has_default_or_nullable, use_default_or_null_flag, - stats)); + auto not_found_cb = [&]() { + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return block->dump_one_line(block_pos, _num_sort_key_columns); + }); + }; + RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, + have_delete_sign, read_plan, specified_rowsets, + segment_caches, has_default_or_nullable, + use_default_or_null_flag, not_found_cb, stats)); } CHECK_EQ(use_default_or_null_flag.size(), num_rows); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 02ae962a16a942..d248d20a1c0980 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -101,6 +101,7 @@ class SegmentWriter { std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, + const std::function& not_found_cb, PartialUpdateStats& stats); Status partial_update_preconditions_check(size_t row_pos); Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos, diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 6339c7db4bfc63..22d6ee4f868927 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -359,7 +359,7 @@ Status VerticalSegmentWriter::_probe_key_for_mow( PartialUpdateReadPlan& read_plan, const std::vector& specified_rowsets, std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, - PartialUpdateStats& stats) { + const std::function& not_found_cb, PartialUpdateStats& stats) { RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; @@ -367,16 +367,8 @@ Status VerticalSegmentWriter::_probe_key_for_mow( specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { - ++stats.num_rows_filtered; - // delete the invalid newly inserted row - _mow_context->delete_bitmap->add( - {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - segment_pos); - } else if (!have_delete_sign) { - RETURN_IF_ERROR( - _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( - *_tablet_schema)); + if (!have_delete_sign) { + RETURN_IF_ERROR(not_found_cb()); } ++stats.num_rows_new_added; has_default_or_nullable = true; @@ -539,10 +531,16 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da bool have_delete_sign = (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0); - RETURN_IF_ERROR(_probe_key_for_mow(key, segment_pos, have_input_seq_column, + auto not_found_cb = [&]() { + return _opts.rowset_ctx->partial_update_info->handle_new_key( + *_tablet_schema, [&]() -> std::string { + return data.block->dump_one_line(block_pos, _num_sort_key_columns); + }); + }; + RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column, have_delete_sign, read_plan, specified_rowsets, segment_caches, has_default_or_nullable, - use_default_or_null_flag, stats)); + use_default_or_null_flag, not_found_cb, stats)); } CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index a327873bbce356..00964823cb62e1 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -172,6 +172,7 @@ class VerticalSegmentWriter { std::vector>& segment_caches, bool& has_default_or_nullable, std::vector& use_default_or_null_flag, + const std::function& not_found_cb, PartialUpdateStats& stats); Status _partial_update_preconditions_check(size_t row_pos); Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e9b518eaae02e3..aca22eb3f31bad 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -428,6 +428,7 @@ Status BaseRowsetBuilder::_build_current_tablet_schema( RETURN_IF_ERROR(_partial_update_info->init( tablet()->tablet_id(), _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_new_key_policy(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 6abe6fdc860425..803d9f8a1ff41b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -38,6 +38,7 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -142,6 +143,7 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { public static final String KEY_SKIP_LINES = "skip_lines"; public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; public static final String KEY_COMMENT = "comment"; @@ -195,6 +197,12 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { return Boolean.valueOf(s); } }) + .put(PARTIAL_UPDATE_NEW_KEY_POLICY, new Function() { + @Override + public @Nullable TPartialUpdateNewRowPolicy apply(@Nullable String s) { + return TPartialUpdateNewRowPolicy.valueOf(s.toUpperCase()); + } + }) .put(TIMEZONE, new Function() { @Override public @Nullable String apply(@Nullable String s) { @@ -388,6 +396,16 @@ public static void checkProperties(Map properties) throws DdlExc } } + // partial update new key policy + final String partialUpdateNewKeyPolicyProperty = properties.get(PARTIAL_UPDATE_NEW_KEY_POLICY); + if (partialUpdateNewKeyPolicyProperty != null) { + if (!partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("append") + && !partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("error")) { + throw new DdlException(PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of [append, error], but found " + + partialUpdateNewKeyPolicyProperty); + } + } + // time zone final String timezone = properties.get(TIMEZONE); if (timezone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 920e03bdfa2472..2cbfc57c833f5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -151,7 +151,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode, int batchSize, String clusterId) { super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate, - txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance, - jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, enableMemTableOnSinkNode, batchSize); + partialUpdateNewKeyPolicy, txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, + loadZeroTolerance, jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, + enableMemTableOnSinkNode, batchSize); this.cloudClusterId = clusterId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 73abdb9f8d15fe..c3a3ddae1acaa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -224,7 +224,8 @@ protected LoadLoadingTask createTask(Database db, OlapTable table, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, @@ -102,6 +105,7 @@ public LoadLoadingTask(Database db, OlapTable table, this.execMemLimit = execMemLimit; this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.txnId = txnId; this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough @@ -121,8 +125,9 @@ public void init(TUniqueId loadId, List> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, - strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, - this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode); + strictMode, isPartialUpdate, partialUpdateNewKeyPolicy, timezone, this.timeoutS, this.loadParallelism, + this.sendBatchParallelism, this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, + enableMemTableOnSinkNode); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index ef429a1d564208..7c2a0129552869 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -44,6 +44,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -68,7 +69,8 @@ public class LoadingTaskPlanner { private final List fileGroups; private final boolean strictMode; private final boolean isPartialUpdate; - private final long timeoutS; // timeout of load job, in second + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; + private final long timeoutS; // timeout of load job, in second private final int loadParallelism; private final int sendBatchParallelism; private final boolean useNewLoadScanNode; @@ -88,7 +90,8 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, + boolean strictMode, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo, boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) { this.loadJobId = loadJobId; @@ -100,6 +103,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.strictMode = strictMode; this.isPartialUpdate = isPartialUpdate; this.analyzer.setTimezone(timezone); + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 8cf32648d55f05..f2b6821c8043cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -49,13 +50,14 @@ public class UnboundTableSink extends UnboundLogicalSin private final boolean temporaryPartition; private final List partitions; private boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; private final boolean autoDetectPartition; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, - false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); + this(nameParts, colNames, hints, false, partitions, false, + TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } /** @@ -63,9 +65,9 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -73,6 +75,7 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, boolean isAutoDetectPartition, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, Optional logicalProperties, - CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); @@ -91,6 +94,7 @@ public UnboundTableSink(List nameParts, List colNames, List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, Optional.empty(), + children.get(0)); } @Override @@ -158,14 +167,16 @@ public int hashCode() { @Override public Plan withGroupExpression(Optional groupExpression) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, - isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, + children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 8ca58f977578a6..70061114caef08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.ImmutableList; @@ -66,12 +67,13 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, LogicalPlan plan) { String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, @@ -91,7 +93,8 @@ public static LogicalSink createUnboundTableSink(List na */ public static LogicalSink createUnboundTableSinkMaybeOverwrite(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType, + boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, LogicalPlan plan) { if (isAutoDetectPartition) { // partitions is null if (!isOverwrite) { @@ -106,7 +109,7 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L if (curCatalog instanceof InternalCatalog) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isAutoDetectPartition, - isPartialUpdate, dmlCommandType, Optional.empty(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2fa8332631a771..4867324ab9074b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -452,6 +452,9 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink cte = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index e975b4914cfe89..575fdb2473c85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -77,6 +77,7 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -126,6 +127,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { Database database = pair.first; OlapTable table = pair.second; boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = sink.getPartialUpdateNewRowPolicy(); LogicalPlan child = ((LogicalPlan) sink.child()); boolean childHasSeqCol = child.getOutput().stream() @@ -148,6 +150,7 @@ private Plan bindOlapTableSink(MatchingContext> ctx) { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()), isPartialUpdate, + partialUpdateNewKeyPolicy, sink.getDMLCommandType(), child); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index d88ce20bb59a7a..7d9dac1057c94f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -42,6 +42,7 @@ public Rule build() { ctx.connectContext.getSessionVariable().isEnableMemtableOnSinkNode() ? false : ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDmlCommandType(), Optional.empty(), sink.getLogicalProperties(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java index 1abf8998e25557..851c28a67cad93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java @@ -70,6 +70,7 @@ import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -437,7 +438,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer logicalQuery = handleCte(logicalQuery); // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(), - isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); + isTempPart, partitions, isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.DELETE, logicalQuery); } protected LogicalPlan handleCte(LogicalPlan logicalPlan) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 69cbf762c2afe6..0e8fb4d8f2e080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -64,6 +64,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.tablefunction.HdfsTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -243,7 +244,8 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), - false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); + false, dataDesc.getPartitionNames(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.LOAD, tvfLogicalPlan); } private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, OlapTable olapTable, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 51cfbf0b27271f..f79303acf227a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -192,7 +193,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer // make UnboundTableSink return UnboundTableSinkCreator.createUnboundTableSink(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(), - false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); + false, ImmutableList.of(), isPartialUpdate, TPartialUpdateNewRowPolicy.APPEND, + DMLCommandType.UPDATE, logicalQuery); } private void checkAssignmentColumn(ConnectContext ctx, List columnNameParts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 72e5c4c14e3b0e..e8d6fcbb963d5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -57,6 +57,7 @@ import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -315,6 +316,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis true, tempPartitionNames, sink.isPartialUpdate(), + sink.getPartialUpdateNewRowPolicy(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); // 1. when overwrite table, allow auto partition or not is controlled by session variable. @@ -330,6 +332,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new HiveInsertCommandContext(); @@ -343,6 +346,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis false, sink.getPartitions(), false, + TPartialUpdateNewRowPolicy.APPEND, sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new IcebergInsertCommandContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 397c2927d8458d..11916290c4db85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -48,13 +49,15 @@ public class LogicalOlapTableSink extends LogicalTableS private final OlapTable targetTable; private final List partitionIds; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, - List outputExprs, boolean isPartialUpdate, DMLCommandType dmlCommandType, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, dmlCommandType, - Optional.empty(), Optional.empty(), child); + List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, partialUpdateNewKeyPolicy, + dmlCommandType, Optional.empty(), Optional.empty(), child); } /** @@ -62,12 +65,14 @@ public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean isPartialUpdate, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.isPartialUpdate = isPartialUpdate; + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.dmlCommandType = dmlCommandType; this.partitionIds = Utils.copyRequiredList(partitionIds); } @@ -77,14 +82,14 @@ public Plan withChildAndUpdateOutput(Plan child) { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), child); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), child); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); } public Database getDatabase() { @@ -103,13 +108,17 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + public DMLCommandType getDmlCommandType() { return dmlCommandType; } public LogicalOlapTableSink withOutputExprs(List outputExprs) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, Optional.empty(), Optional.empty(), child()); + partialUpdateNewKeyPolicy, dmlCommandType, Optional.empty(), Optional.empty(), child()); } @Override @@ -125,6 +134,7 @@ public boolean equals(Object o) { } LogicalOlapTableSink that = (LogicalOlapTableSink) o; return isPartialUpdate == that.isPartialUpdate && dmlCommandType == that.dmlCommandType + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) && Objects.equals(partitionIds, that.partitionIds); @@ -133,7 +143,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -145,6 +155,7 @@ public String toString() { "cols", cols, "partitionIds", partitionIds, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -157,13 +168,14 @@ public R accept(PlanVisitor visitor, C context) { @Override public Plan withGroupExpression(Optional groupExpression) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - dmlCommandType, groupExpression, logicalProperties, children.get(0)); + partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 65dba188bf02cc..8f1746e79f2d33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -57,6 +58,7 @@ public class PhysicalOlapTableSink extends PhysicalTabl private final List partitionIds; private final boolean singleReplicaLoad; private final boolean isPartialUpdate; + private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy; private final DMLCommandType dmlCommandType; /** @@ -64,10 +66,11 @@ public class PhysicalOlapTableSink extends PhysicalTabl */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + LogicalProperties logicalProperties, CHILD_TYPE child) { this(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } @@ -76,9 +79,10 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, DMLCommandType dmlCommandType, - Optional groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + DMLCommandType dmlCommandType, Optional groupExpression, + LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, + CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); @@ -87,6 +91,7 @@ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, dmlCommandType, groupExpression, + singleReplicaLoad, isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -137,6 +146,7 @@ public boolean equals(Object o) { PhysicalOlapTableSink that = (PhysicalOlapTableSink) o; return singleReplicaLoad == that.singleReplicaLoad && isPartialUpdate == that.isPartialUpdate + && partialUpdateNewKeyPolicy == that.partialUpdateNewKeyPolicy && dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) @@ -147,7 +157,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, - isPartialUpdate, dmlCommandType); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType); } @Override @@ -160,6 +170,7 @@ public String toString() { "partitionIds", partitionIds, "singleReplicaLoad", singleReplicaLoad, "isPartialUpdate", isPartialUpdate, + "partialUpdateNewKeyPolicy", partialUpdateNewKeyPolicy, "dmlCommandType", dmlCommandType ); } @@ -177,20 +188,22 @@ public List getExpressions() { @Override public Plan withGroupExpression(Optional groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), child()); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), + child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, logicalProperties.get(), children.get(0)); + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, logicalProperties.get(), + children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -223,7 +236,7 @@ public PhysicalProperties getRequirePhysicalProperties() { @Override public PhysicalOlapTableSink resetLogicalProperties() { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, dmlCommandType, groupExpression, + isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, groupExpression, null, physicalProperties, statistics, child()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 3f5af10d4b17f4..a79f68e1c3af29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -73,6 +73,7 @@ import org.apache.doris.thrift.TOlapTableSchemaParam; import org.apache.doris.thrift.TOlapTableSink; import org.apache.doris.thrift.TPaloNodesInfo; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; @@ -108,6 +109,7 @@ public class OlapTableSink extends DataSink { // partial update input columns private boolean isPartialUpdate = false; private HashSet partialUpdateInputColumns; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; // set after init called protected TDataSink tDataSink; @@ -190,6 +192,10 @@ public void setPartialUpdateInputColumns(boolean isPartialUpdate, HashSet DataSink. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 65b74f2353188b..a7e535712b8a92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -38,6 +38,7 @@ import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TGroupCommitMode; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -531,6 +532,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update"; + public static final String PARTIAL_UPDATE_NEW_KEY_BEHAVIOR = "partial_update_new_key_behavior"; + public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold"; public static final String INVERTED_INDEX_MAX_EXPANSIONS = "inverted_index_max_expansions"; @@ -1972,6 +1975,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true) public boolean enableUniqueKeyPartialUpdate = false; + @VariableMgr.VarAttr(name = PARTIAL_UPDATE_NEW_KEY_BEHAVIOR, needForward = true, description = { + "用于设置部分列更新中对于新插入的行的行为", + "Used to set the behavior for newly inserted rows in partial update." + }, checker = "checkPartialUpdateNewKeyBehavior", options = {"APPEND", "ERROR"}) + public String partialUpdateNewKeyPolicy = "APPEND"; + @VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = { "用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错", "Used to test whether the query cache is hit. " @@ -3999,6 +4008,10 @@ public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyPolicy); + } + public int getLoadStreamPerNode() { return loadStreamPerNode; } @@ -4550,6 +4563,29 @@ public void checkGenerateStatsFactor(String generateStatsFactor) { } } + public TPartialUpdateNewRowPolicy parsePartialUpdateNewKeyBehavior(String behavior) { + if (behavior == null) { + return null; + } else if (behavior.equalsIgnoreCase("APPEND")) { + return TPartialUpdateNewRowPolicy.APPEND; + } else if (behavior.equalsIgnoreCase("ERROR")) { + return TPartialUpdateNewRowPolicy.ERROR; + } + return null; + } + + public void checkPartialUpdateNewKeyBehavior(String partialUpdateNewKeyBehavior) { + TPartialUpdateNewRowPolicy policy = parsePartialUpdateNewKeyBehavior(partialUpdateNewKeyBehavior); + if (policy == null) { + UnsupportedOperationException exception = + new UnsupportedOperationException(PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + + " should be one of {'APPEND', 'ERROR'}, but found " + + partialUpdateNewKeyBehavior); + LOG.warn("Check " + PARTIAL_UPDATE_NEW_KEY_BEHAVIOR + " failed", exception); + throw exception; + } + } + public void setGenerateStatsFactor(int factor) { this.generateStatsFactor = factor; if (factor <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index d48d1702e1b2db..76e1738699b361 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; @@ -110,6 +111,10 @@ default long getFileSize() { boolean isPartialUpdate(); + default TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return TPartialUpdateNewRowPolicy.APPEND; + } + default boolean getTrimDoubleQuotes() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 94f3625fbc7c2b..7254fca6f6ed95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -33,6 +33,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; @@ -84,6 +85,7 @@ public class StreamLoadTask implements LoadTaskInfo { private List hiddenColumns; private boolean trimDoubleQuotes = false; private boolean isPartialUpdate = false; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private int skipLines = 0; private boolean enableProfile = false; @@ -301,6 +303,11 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + @Override + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; + } + @Override public boolean isMemtableOnSinkNode() { return memtableOnSinkNode; @@ -454,6 +461,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetPartialUpdate()) { isPartialUpdate = request.isPartialUpdate(); } + if (request.isSetPartialUpdateNewKeyPolicy()) { + partialUpdateNewKeyPolicy = request.getPartialUpdateNewKeyPolicy(); + } if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java index cef9bac4ed90a7..f3d718e9641424 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/EliminateSortTest.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Test; @@ -99,7 +100,7 @@ void testEliminateSortUnderTableSink() { .limit(1, 1).build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .disableNereidsRules("PRUNE_EMPTY_PARTITION") .rewrite() @@ -113,7 +114,7 @@ void testEliminateSortUnderTableSink() { .build(); plan = new LogicalOlapTableSink<>(new Database(), scan.getTable(), scan.getTable().getBaseSchema(), new ArrayList<>(), plan.getOutput().stream().map(NamedExpression.class::cast).collect( - Collectors.toList()), false, DMLCommandType.NONE, plan); + Collectors.toList()), false, TPartialUpdateNewRowPolicy.APPEND, DMLCommandType.NONE, plan); PlanChecker.from(MemoTestUtils.createConnectContext(), plan) .rewrite() .nonMatch(logicalSort()); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 99cd99410ed7de..d29600ebf2b687 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -74,5 +74,8 @@ message POlapTableSchemaParam { optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; optional int32 nano_seconds = 14 [default = 0]; + reserved 15; // optional UniqueKeyUpdateModePB unique_key_update_mode = 15 [default = UPSERT]; + reserved 16; // optional int32 sequence_map_col_unique_id = 16 [default = -1]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 17 [default = APPEND]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index d62c9df5073147..7d1742918b6fde 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -628,6 +628,11 @@ message RowsetBinlogMetasPB { repeated RowsetBinlogMetaPB rowset_binlog_metas = 1; } +enum PartialUpdateNewRowPolicyPB { + APPEND = 0; + ERROR = 1; +} + message PartialUpdateInfoPB { optional bool is_partial_update = 1 [default = false]; repeated string partial_update_input_columns = 2; @@ -642,4 +647,6 @@ message PartialUpdateInfoPB { repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; optional int32 nano_seconds = 13 [default = 0]; + reserved 14; // optional UniqueKeyUpdateModePB partial_update_mode = 14 [default = UPSERT]; + optional PartialUpdateNewRowPolicyPB partial_update_new_key_policy = 15 [default = APPEND]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 14bb6412a10747..3256de4756fd8c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -161,6 +161,11 @@ enum TIndexType { NGRAM_BF = 3 } +enum TPartialUpdateNewRowPolicy { + APPEND = 0, + ERROR = 1 +} + // Mapping from names defined by Avro to the enum. // We permit gzip and bzip2 in addition. const map COMPRESSION_MAP = { @@ -259,6 +264,9 @@ struct TOlapTableSchemaParam { 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 + // 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + 15: optional i32 sequence_map_col_unique_id = -1 + 16: optional TPartialUpdateNewRowPolicy partial_update_new_key_policy } struct TTabletLocation { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 19d91f0fded93c..72a1041a1bff47 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -764,6 +764,8 @@ struct TStreamLoadPutRequest { 54: optional bool group_commit // deprecated 55: optional i32 stream_per_node; 56: optional string group_commit_mode + // 57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode + 58: optional Descriptors.TPartialUpdateNewRowPolicy partial_update_new_key_policy // For cloud 1000: optional string cloud_cluster diff --git a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out index 4585bd5c5834dd..6ad32390d03d36 100644 --- a/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out +++ b/regression-test/data/doc/data-operate/import/import-way/error-data-handling.md.out @@ -1,9 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !select -- -1 kevin 18 shenzhen 500 2023-07-03T12:00:01 -3 \N \N \N 23 2023-07-03T12:00:02 -18 \N \N \N 9999999 2023-07-03T12:00:03 - -- !select -- 3 Jack \N 90 4 Mike 3 \N diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv new file mode 100644 index 00000000000000..49517f38a3b070 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy1.csv @@ -0,0 +1,4 @@ +0,40 +1,40 +10,40 +11,40 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv new file mode 100644 index 00000000000000..3f614863e4f57c --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy2.csv @@ -0,0 +1,4 @@ +2,50 +3,50 +13,50 +14,50 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv new file mode 100644 index 00000000000000..7fa3c1ae4e2bcd --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/row_policy3.csv @@ -0,0 +1,4 @@ +4,60,60,60 +5,60,60,60 +21,60,60,60 +22,60,60,60 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out new file mode 100644 index 00000000000000..499f7f4d1ffcc9 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.out @@ -0,0 +1,153 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !insert_append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert_error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !insert3 -- +0 10 0 0 +1 9 9 9 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +100 9 9 9 +200 9 9 9 + +-- !stream_load_append -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_error -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 +100 9 9 9 +200 9 9 9 + +-- !stream_load_ignore_property -- +0 10 0 40 +1 9 9 40 +2 9 9 9 +3 10 \N \N +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 +100 9 9 9 +200 9 9 9 + +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 + +-- !broker_load_append -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_error -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +10 \N \N 40 +11 \N \N 40 + +-- !broker_load_ignore_property -- +0 0 0 40 +1 1 1 40 +2 2 2 2 +3 3 3 3 +4 60 60 60 +5 60 60 60 +10 \N \N 40 +11 \N \N 40 +21 60 60 60 +22 60 60 60 + +-- !sql -- +0 \N 30 456 +1 20 123 456 +2 \N 30 456 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 \N 30 456 +0 0 0 0 +0 20 123 456 +1 1 1 1 +1 20 123 456 +2 \N 30 456 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 \N 30 456 +10 20 123 456 +11 20 123 456 +12 \N 30 456 + +-- !sql -- +0 20 0 0 +1 20 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +10 20 30 \N +11 20 \N \N +12 \N 30 \N + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out new file mode 100644 index 00000000000000..9c95e5ebde7c1e --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_new_row_policy.out @@ -0,0 +1,78 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 +1 1 1 1 +2 2 2 2 + +-- !append -- +0 10 0 0 +1 1 1 1 +2 2 2 2 +3 10 \N \N +4 10 \N \N +5 10 \N \N + +-- !ignore -- +0 10 0 0 +1 1 20 1 +2 2 2 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error1 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !error2 -- +0 10 0 0 +1 1 30 1 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N + +-- !append -- +0 10 0 40 +1 1 30 40 +2 2 30 2 +3 10 80 \N +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !ignore -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N \N +5 10 \N \N +10 \N \N 40 +11 \N \N 40 + +-- !error1 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 40 +11 \N \N 40 + +-- !error2 -- +0 10 0 40 +1 1 30 40 +2 2 30 50 +3 10 80 50 +4 10 \N 60 +5 10 \N 60 +10 \N \N 70 +11 \N \N 40 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out index 93fa33d81f9e96..18fa75df60991e 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out @@ -4,12 +4,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -23,7 +27,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -38,12 +44,16 @@ -- !sql -- 1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 -- !sql -- -1 kevin 18 shenzhen 400 2023-07-01T12:00 +1 kevin 18 shenzhen 500 2023-07-03T12:00:01 +3 \N 20 beijing 23 2023-07-03T12:00:02 +18 \N 20 beijing 9999999 2023-07-03T12:00:03 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 @@ -57,7 +67,9 @@ -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 +2 \N 20 beijing 500 2023-07-03T12:00:01 3 steve 23 beijing 500 2023-07-03T12:00:02 +4 \N 20 beijing 23 2023-07-03T12:00:02 -- !sql -- 1 kevin 18 shenzhen 400 2023-07-01T12:00 diff --git a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy index d8057f374afd01..ccc46f59af946f 100644 --- a/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy +++ b/regression-test/suites/doc/data-operate/import/import-way/error-data-handling.md.groovy @@ -19,65 +19,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_error_data_handling", "p0") { def tableName = "test_error_data_handling" - // partial update strict mode - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE ${tableName} ( - id INT, - name VARCHAR(10), - age INT, - city VARCHAR(10), - balance DECIMAL(9, 0), - last_access_time DATETIME - ) - UNIQUE KEY(id) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES("replication_num" = "1") - """ - - sql """ - INSERT INTO ${tableName} VALUES (1, 'kevin', 18, 'shenzhen', 400, '2023-07-01 12:00:00') - """ - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'true' - time 10000 - check {result, exception, startTime, endTime -> - assertTrue(exception == null) - def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) - assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) - } - } - - streamLoad { - table "${tableName}" - file "test_data_partial.csv" - set 'column_separator', ',' - set 'columns', 'id,balance,last_access_time' - set 'partial_columns', 'true' - set 'strict_mode', 'false' - time 10000 - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - } - } - - qt_select "SELECT * FROM ${tableName} ORDER BY id" - sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ diff --git a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy index 6a54d6a5e71702..f2ff40649d24de 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy @@ -179,13 +179,12 @@ suite("nereids_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync;" - // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy index d92dc41a68bc34..ac98141b121245 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_light_schema_change.groovy @@ -264,14 +264,16 @@ suite("test_partial_update_insert_light_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" - sql "set enable_unique_key_partial_update=false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync" qt_add_key_col_2 " select * from ${tableName} order by c0; " diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy index 9e0abc9704c482..852fad19737336 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_schema_change.groovy @@ -249,12 +249,16 @@ suite("test_partial_update_insert_schema_change", "p0") { sql "sync" // test insert data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR sql "set enable_unique_key_partial_update=true;" + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" test { sql "insert into ${tableName}(c0,c1) values(1, 1);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" sql "insert into ${tableName}(c0,c1,c2) values(1,0,10);" sql "set enable_unique_key_partial_update=false;" sql "sync" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy index c3e25a3a690c44..87eb39a3b5b567 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_native_insert_stmt.groovy @@ -175,17 +175,17 @@ suite("test_partial_update_native_insert_stmt", "p0") { sql """insert into ${tableName5} values(1,"kevin",18,"shenzhen",400,"2023-07-01 12:00:00");""" qt_5 """select * from ${tableName5} order by id;""" - sql "set enable_insert_strict = true;" + sql """set partial_update_new_key_behavior="ERROR";""" sql "set enable_unique_key_partial_update=true;" sql "sync;" // partial update using insert stmt in strict mode, the max_filter_ratio is always 0 test { sql """ insert into ${tableName5}(id,balance,last_access_time) values(1,500,"2023-07-03 12:00:01"),(3,23,"2023-07-03 12:00:02"),(18,9999999,"2023-07-03 12:00:03"); """ - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_5 """select * from ${tableName5} order by id;""" sql "set enable_unique_key_partial_update=false;" - sql "set enable_insert_strict = false;" + sql """set partial_update_new_key_behavior="APPEND";""" sql "sync;" sql """ DROP TABLE IF EXISTS ${tableName5}; """ diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy new file mode 100644 index 00000000000000..c425650c2917dd --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_new_key_policy.groovy @@ -0,0 +1,267 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_new_key_policy", "p0") { + + def tableName = "test_partial_update_new_key_policy" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="3");""" + qt_sql """select * from ${tableName} order by k;""" + + def checkVariable = { expected -> + def res = sql_return_maparray """show variables where Variable_name="partial_update_new_key_behavior";"""; + logger.info("res: ${res}") + assertTrue(res[0].Value.equalsIgnoreCase(expected)); + } + + // 1. test insert stmt + // 1.1 + sql "set enable_unique_key_partial_update=true;" + sql "sync" + + sql """set partial_update_new_key_behavior="APPEND";""" + sql "sync;" + checkVariable("APPEND") + explain { + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: APPEND" + } + sql "insert into ${tableName}(k,c1) values(0,10),(3,10),(4,10),(5,10);" + qt_insert_append """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior="ERROR";""" + sql "sync;" + checkVariable("ERROR") + explain { + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + contains "PARTIAL_UPDATE_NEW_KEY_BEHAVIOR: ERROR" + } + sql "insert into ${tableName}(k,c2) values(1,30),(2,30);" + qt_insert_error1 """select * from ${tableName} order by k;""" + test { + sql "insert into ${tableName}(k,c2) values(1,30),(10,999),(11,999);" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[10] is not in table." + } + qt_insert_error2 """select * from ${tableName} order by k;""" + + + sql """set partial_update_new_key_behavior=default;""" + sql "sync;" + checkVariable("APPEND") + test { + sql """set partial_update_new_key_behavior="invalid";""" + exception "partial_update_new_key_behavior should be one of {'APPEND', 'ERROR'}, but found invalid" + } + checkVariable("APPEND") + + // 1.2 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + sql "set partial_update_new_key_behavior=ERROR;" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + + sql "insert into ${tableName} values(1,9,9,9),(2,9,9,9),(100,9,9,9),(200,9,9,9);" + qt_insert3 """select * from ${tableName} order by k;""" + + + // 2. test stream load + // 2.1 APPEND + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'append' + file 'row_policy1.csv' + time 10000 + } + qt_stream_load_append """select * from ${tableName} order by k;""" + // 2.2 ERROR + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c3' + set 'partial_columns', 'true' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy2.csv' + time 10000 + check {result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.")) + } + } + qt_stream_load_error """select * from ${tableName} order by k;""" + // 2.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k,c1,c2,c3' + set 'partial_columns', 'false' + set 'partial_update_new_key_behavior', 'error' + file 'row_policy3.csv' + time 10000 + } + qt_stream_load_ignore_property """select * from ${tableName} order by k;""" + + + // 3. test broker load + tableName = "test_partial_update_new_key_policy2" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int, + `c3` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true"); """ + sql "insert into ${tableName} select number,number,number,number from numbers(\"number\"=\"6\");" + qt_sql """select * from ${tableName} order by k;""" + + // 3.1 APPEND + def label = "test_pu_new_key_policy1" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy1.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "APPEND" + ); + """ + waitForBrokerLoadDone(label) + def res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_append """select * from ${tableName} order by k;""" + + // 3.2 ERROR + label = "test_pu_new_key_policy2" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy2.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label, 600) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "CANCELLED" && res[0].ErrorMsg.contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR. Row with key=[13] is not in table.") + qt_broker_load_error """select * from ${tableName} order by k;""" + + // 3.3 partial_update_new_key_behavior will not take effect when enable_unique_key_partial_update is false + label = "test_pu_new_key_policy3" + UUID.randomUUID().toString().replace("-", "_") + sql """ + LOAD LABEL $label ( + DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_policy3.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + (k,c1,c2,c3) + ) WITH S3 ( + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_REGION" = "${getS3Region()}", + "provider" = "${getS3Provider()}" + ) properties( + "partial_update_new_key_behavior" = "ERROR" + ); + """ + waitForBrokerLoadDone(label) + res = sql_return_maparray """show load where label="$label";""" + assert res[0].State == "FINISHED" + qt_broker_load_ignore_property """select * from ${tableName} order by k;""" + + + // 4. test this config will not affect non MOW tables + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" + + tableName = "test_partial_update_new_key_policy3" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int, + `c2` int default "123", + `c3` int default "456") + DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k,c1,c2,c3;""" + + tableName = "test_partial_update_new_key_policy4" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ CREATE TABLE ${tableName} ( + `k` BIGINT NOT NULL, + `c1` int max, + `c2` int min, + `c3` int sum) + AGGREGATE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1"); """ + sql """insert into ${tableName} select number,number,number,number from numbers("number"="5");""" + sql " insert into ${tableName}(k,c1) values(0,20),(1,20),(10,20),(11,20);" + sql "insert into ${tableName}(k,c2) values(0,30),(2,30),(10,30),(12,30);" + qt_sql """select * from ${tableName} order by k;""" +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy index 722f1fccb7643c..fac17ac542bc16 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_only_keys.groovy @@ -42,19 +42,18 @@ suite("test_partial_update_only_keys", "p0") { qt_sql """select * from ${tableName} order by k;""" // new rows will be appended sql "set enable_unique_key_partial_update=true;" - sql "set enable_insert_strict=false;" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" - // fail if has new rows - sql "set enable_insert_strict=true;" + // fail if has new rows when partial_update_new_key_behavior=ERROR + sql """set partial_update_new_key_behavior="ERROR";""" sql "sync" sql "insert into ${tableName}(k) values(0),(1),(4),(5),(6);" qt_sql """select * from ${tableName} order by k;""" test { sql "insert into ${tableName}(k) values(0),(1),(10),(11);" - exception "Insert has filtered data in strict mode" + exception "[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR" } qt_sql """select * from ${tableName} order by k;""" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 864a97e13bfdc5..ddd2436f938234 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -421,7 +421,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -434,9 +434,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1002,7 +1000,7 @@ suite("test_partial_update_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1015,9 +1013,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy index 2ce042f8b6391c..22d396124924e7 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -419,13 +419,13 @@ suite("test_partial_update_row_store_schema_change", "p0") { }); // test load data with all key column, should fail because - // it inserts a new row in strict mode + // it inserts a new row when partial_update_new_key_behavior=ERROR streamLoad { table "${tableName}" set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -438,9 +438,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } @@ -1015,7 +1013,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { set 'column_separator', ',' set 'partial_columns', 'true' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' set 'columns', 'c0, c1' file 'schema_change/load_with_key_column.csv' @@ -1028,9 +1026,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(1, json.NumberTotalRows) - assertEquals(1, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy index c4d26baee8f481..dc591d7b02a6c4 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy @@ -18,6 +18,9 @@ suite("test_partial_update_strict_mode", "p0") { + // NOTE: after https://github.com/apache/doris/pull/41232, the handling of newly inserted rows in partial update + // is not controlled by strict mode + String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database @@ -67,8 +70,8 @@ suite("test_partial_update_strict_mode", "p0") { def json = parseJson(result) assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -114,11 +117,10 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + assertEquals("Success", json.Status) assertEquals(3, json.NumberTotalRows) - assertEquals(0, json.NumberLoadedRows) - assertEquals(2, json.NumberFilteredRows) + assertEquals(3, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) } } sql "sync" @@ -211,7 +213,7 @@ suite("test_partial_update_strict_mode", "p0") { check {result, exception, startTime, endTime -> assertTrue(exception == null) def json = parseJson(result) - assertEquals("Fail", json.Status) + assertEquals("Success", json.Status) } } @@ -253,7 +255,7 @@ suite("test_partial_update_strict_mode", "p0") { (1,600,"2023-08-03 12:00:01"), (2,500,"2023-07-03 12:00:01"), (4,23,"2023-07-03 12:00:02");""" - exception "Insert has filtered data in strict mode" + exception "[E-207]the unmentioned column `city` should have default value or be nullable for newly inserted rows in non-strict mode partial update" } sql "sync;" qt_sql """select * from ${tableName5} order by id;""" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy index ad2e4b2f4ff229..ba12be2b99ae2d 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy @@ -78,7 +78,7 @@ suite("test_partial_update_upsert", "p0") { `last_access_time` datetime NULL ) ENGINE = OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) - BUCKETS AUTO PROPERTIES ( + BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "storage_format" = "V2", "enable_unique_key_merge_on_write" = "true", @@ -95,7 +95,7 @@ suite("test_partial_update_upsert", "p0") { set 'format', 'csv' set 'partial_columns', 'true' set 'columns', 'id,balance,last_access_time' - set 'strict_mode', 'true' + set 'partial_update_new_key_behavior', 'ERROR' file 'upsert.csv' time 10000 // limit inflight 10s @@ -104,6 +104,7 @@ suite("test_partial_update_upsert", "p0") { assertTrue(exception == null) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.toString().contains("[E-7003]Can't append new rows in partial update when partial_update_new_key_behavior is ERROR")) } } sql "sync"