From f5475eabca0b150dbcbf3675aee14708bbbe34e9 Mon Sep 17 00:00:00 2001 From: hugoluo Date: Mon, 20 Nov 2023 20:00:13 +0800 Subject: [PATCH 1/5] [Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773) --- be/src/exec/tablet_info.cpp | 8 +- be/src/exec/tablet_info.h | 2 + be/src/http/action/stream_load.cpp | 7 + be/src/http/http_common.h | 1 + be/src/olap/delta_writer_v2.cpp | 4 +- be/src/olap/partial_update_info.h | 4 +- be/src/olap/rowset_builder.cpp | 4 +- be/src/olap/tablet.cpp | 105 +++++++------- be/src/olap/tablet_meta.cpp | 22 +++ be/src/olap/tablet_meta.h | 16 +++ be/src/olap/tablet_schema.cpp | 2 + be/src/olap/tablet_schema.h | 5 + .../Load/STREAM-LOAD.md | 2 + .../Manipulation/INSERT.md | 4 +- .../Load/STREAM-LOAD.md | 2 + .../Manipulation/INSERT.md | 5 +- fe/fe-core/src/main/cup/sql_parser.cup | 23 ++- .../org/apache/doris/analysis/DeleteStmt.java | 3 +- .../doris/analysis/NativeInsertStmt.java | 27 +++- .../org/apache/doris/analysis/UpdateStmt.java | 3 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 2 +- .../commands/InsertIntoTableCommand.java | 3 +- .../apache/doris/planner/OlapTableSink.java | 18 ++- .../doris/planner/StreamLoadPlanner.java | 4 +- .../org/apache/doris/task/LoadTaskInfo.java | 4 + .../org/apache/doris/task/StreamLoadTask.java | 8 ++ .../doris/planner/OlapTableSinkTest.java | 8 +- gensrc/proto/descriptors.proto | 1 + gensrc/proto/olap_file.proto | 1 + gensrc/thrift/Descriptors.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + .../data/insert_p0/insert_ignore.out | 36 +++++ .../data/unique_with_mow_p0/ignore_mode.csv | 10 ++ .../data/unique_with_mow_p0/ignore_mode2.csv | 2 + .../unique_with_mow_p0/test_ignore_mode.out | 20 +++ .../suites/insert_p0/insert_ignore.groovy | 132 ++++++++++++++++++ .../test_ignore_mode.groovy | 112 +++++++++++++++ 37 files changed, 535 insertions(+), 77 deletions(-) create mode 100644 regression-test/data/insert_p0/insert_ignore.out create mode 100644 regression-test/data/unique_with_mow_p0/ignore_mode.csv create mode 100644 regression-test/data/unique_with_mow_p0/ignore_mode2.csv create mode 100644 regression-test/data/unique_with_mow_p0/test_ignore_mode.out create mode 100644 regression-test/suites/insert_p0/insert_ignore.groovy create mode 100644 regression-test/suites/unique_with_mow_p0/test_ignore_mode.groovy diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 90d434625816ef..ac1035584086bc 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -126,6 +126,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _version = pschema.version(); _is_partial_update = pschema.partial_update(); _is_strict_mode = pschema.is_strict_mode(); + _is_unique_key_ignore_mode = pschema.is_unique_key_ignore_mode(); for (auto& col : pschema.partial_update_input_columns()) { _partial_update_input_columns.insert(col); @@ -183,9 +184,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _table_id = tschema.table_id; _version = tschema.version; _is_partial_update = tschema.is_partial_update; - if (tschema.__isset.is_strict_mode) { - _is_strict_mode = tschema.is_strict_mode; - } + _is_strict_mode = tschema.__isset.is_strict_mode && tschema.is_strict_mode; + _is_unique_key_ignore_mode = + tschema.__isset.is_unique_key_ignore_mode && tschema.is_unique_key_ignore_mode; for (auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); @@ -255,6 +256,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_version(_version); pschema->set_partial_update(_is_partial_update); pschema->set_is_strict_mode(_is_strict_mode); + pschema->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode); for (auto col : _partial_update_input_columns) { *pschema->add_partial_update_input_columns() = col; } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 2e5f40bec35d86..c07df1371530b4 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -93,6 +93,7 @@ class OlapTableSchemaParam { return _partial_update_input_columns; } bool is_strict_mode() const { return _is_strict_mode; } + bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; } std::string debug_string() const; private: @@ -107,6 +108,7 @@ class OlapTableSchemaParam { bool _is_partial_update = false; std::set _partial_update_input_columns; bool _is_strict_mode = false; + bool _is_unique_key_ignore_mode = false; }; using OlapTableIndexTablets = TOlapTableIndexTablets; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 6ca7148b3b653b..6b3850caa1ee6b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -601,6 +601,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); } + if (!http_req->header(HTTP_IGNORE_MODE).empty()) { + if (iequal(http_req->header(HTTP_IGNORE_MODE), "true")) { + request.__set_ignore_mode(true); + } else { + request.__set_ignore_mode(false); + } + } request.__set_group_commit(ctx->group_commit); #ifndef BE_TEST diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 5a1550f48fcefc..9eeab1b38a8114 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -58,6 +58,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines"; static const std::string HTTP_COMMENT = "comment"; static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns"; +static const std::string HTTP_IGNORE_MODE = "ignore_mode"; static const std::string HTTP_SQL = "sql"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 0a4108970a60c4..eae372c1c06305 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -230,7 +230,9 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info = std::make_shared(); _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode()); + table_schema_param->is_strict_mode(), + table_schema_param->is_unique_key_ignore_mode()); + _tablet_schema->set_is_unique_key_ignore_mode(table_schema_param->is_unique_key_ignore_mode()); } } // namespace doris diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index cdea698b20d8a5..d792bf6c5d7fae 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -23,7 +23,7 @@ namespace doris { struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode) { + const std::set& partial_update_cols, bool is_strict_mode, bool is_unique_key_ignore_mode) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; missing_cids.clear(); @@ -40,6 +40,7 @@ struct PartialUpdateInfo { } } this->is_strict_mode = is_strict_mode; + this->is_unique_key_ignore_mode = is_unique_key_ignore_mode; } bool is_partial_update {false}; @@ -50,5 +51,6 @@ struct PartialUpdateInfo { // to generate a new row, only available in non-strict mode bool can_insert_new_rows_in_partial_update {true}; bool is_strict_mode {false}; + bool is_unique_key_ignore_mode {false}; }; } // namespace doris diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 219d344f6253fc..476180051b29a3 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -369,7 +369,9 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, _partial_update_info = std::make_shared(); _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode()); + table_schema_param->is_strict_mode(), + table_schema_param->is_unique_key_ignore_mode()); + } } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index b9818589650181..2a809a016dfef2 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -105,6 +105,7 @@ #include "olap/txn_manager.h" #include "olap/types.h" #include "olap/utils.h" +#include "runtime/define_primitive_type.h" #include "segment_loader.h" #include "service/point_query_executor.h" #include "util/bvar_helper.h" @@ -2881,6 +2882,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, Version dummy_version(end_version + 1, end_version + 1); auto rowset_schema = rowset->tablet_schema(); bool is_partial_update = rowset_writer && rowset_writer->is_partial_update(); + bool is_unique_key_ignore_mode = rowset_schema->is_unique_key_ignore_mode(); bool have_input_seq_column = false; if (is_partial_update && rowset_schema->has_sequence_col()) { std::vector including_cids = @@ -2957,54 +2959,61 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, if (st.is()) { continue; } - - if (st.is() && (!is_partial_update || have_input_seq_column)) { - // `st.is()` means that there exists a row with the same key and larger value - // in seqeunce column. - // - If the current load is not a partial update, we just delete current row. - // - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts - // during concurrent partial updates. And there exists another load which introduces a row with - // the same keys and larger sequence column value published successfully after the commit phase - // of the current load. - // - If the columns we update include sequence column, we should delete the current row becase the - // partial update on the current row has been `overwritten` by the previous one with larger sequence - // column value. - // - Otherwise, we should combine the values of the missing columns in the previous row and the values - // of the including columns in the current row into a new row. - delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, - row_id); - continue; - } - if (is_partial_update && rowset_writer != nullptr) { - // In publish version, record rows to be deleted for concurrent update - // For example, if version 5 and 6 update a row, but version 6 only see - // version 4 when write, and when publish version, version 5's value will - // be marked as deleted and it's update is losed. - // So here we should read version 5's columns and build a new row, which is - // consists of version 6's update columns and version 5's origin columns - // here we build 2 read plan for ori values and update values - prepare_to_read(loc, pos, &read_plan_ori); - prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); - rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; - ++pos; - // delete bitmap will be calculate when memtable flush and - // publish. The two stages may see different versions. - // When there is sequence column, the currently imported data - // of rowset may be marked for deletion at memtablet flush or - // publish because the seq column is smaller than the previous - // rowset. - // just set 0 as a unified temporary version number, and update to - // the real version number later. - delete_bitmap->add( - {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - loc.row_id); - delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, - row_id); - continue; + if (UNLIKELY(is_unique_key_ignore_mode)) { + if (st.is() || st.is()) { + delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, + row_id); + delete_bitmap->add_ignore({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}); + } + } else { + if (st.is() && (!is_partial_update || have_input_seq_column)) { + // `st.is()` means that there exists a row with the same key and larger value + // in seqeunce column. + // - If the current load is not a partial update, we just delete current row. + // - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts + // during concurrent partial updates. And there exists another load which introduces a row with + // the same keys and larger sequence column value published successfully after the commit phase + // of the current load. + // - If the columns we update include sequence column, we should delete the current row becase the + // partial update on the current row has been `overwritten` by the previous one with larger sequence + // column value. + // - Otherwise, we should combine the values of the missing columns in the previous row and the values + // of the including columns in the current row into a new row. + delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, + row_id); + continue; + } + if (is_partial_update && rowset_writer != nullptr) { + // In publish version, record rows to be deleted for concurrent update + // For example, if version 5 and 6 update a row, but version 6 only see + // version 4 when write, and when publish version, version 5's value will + // be marked as deleted and it's update is losed. + // So here we should read version 5's columns and build a new row, which is + // consists of version 6's update columns and version 5's origin columns + // here we build 2 read plan for ori values and update values + prepare_to_read(loc, pos, &read_plan_ori); + prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); + rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; + ++pos; + // delete bitmap will be calculate when memtable flush and + // publish. The two stages may see different versions. + // When there is sequence column, the currently imported data + // of rowset may be marked for deletion at memtablet flush or + // publish because the seq column is smaller than the previous + // rowset. + // just set 0 as a unified temporary version number, and update to + // the real version number later. + delete_bitmap->add( + {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, + loc.row_id); + delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, + row_id); + continue; + } + // when st = ok + delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, + loc.row_id); } - // when st = ok - delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - loc.row_id); } remaining -= num_read; } @@ -3396,7 +3405,7 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { src.segment_id = seg_id; DeleteBitmap subset_map(tablet_id()); - input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version}, + input_delete_bitmap.subset_ignore({rowset->rowset_id(), seg_id, start_version}, {rowset->rowset_id(), seg_id, end_version}, &subset_map); // traverse all versions and convert rowid for (auto iter = subset_map.delete_bitmap.begin(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 76cf14a75ba74d..cd769c33d15df8 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -916,6 +916,11 @@ void DeleteBitmap::add(const BitmapKey& bmk, uint32_t row_id) { delete_bitmap[bmk].add(row_id); } +void DeleteBitmap::add_ignore(const BitmapKey& bmk) { + std::lock_guard l(lock); + delete_bitmap_ignore.insert(bmk); +} + int DeleteBitmap::remove(const BitmapKey& bmk, uint32_t row_id) { std::lock_guard l(lock); auto it = delete_bitmap.find(bmk); @@ -1001,6 +1006,23 @@ void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end, } } +void DeleteBitmap::subset_ignore(const BitmapKey& start, const BitmapKey& end, + DeleteBitmap* subset_rowset_map) const { + roaring::Roaring roaring; + DCHECK(start < end); + std::shared_lock l(lock); + for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) { + auto& [k, bm] = *it; + if (k >= end) { + break; + } + if (delete_bitmap_ignore.find(k) == delete_bitmap_ignore.end()) { + break; + } + subset_rowset_map->set(k, bm); + } +} + void DeleteBitmap::merge(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) { std::lock_guard l(lock); auto [iter, succ] = delete_bitmap.emplace(bmk, segment_delete_bitmap); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 60cc485882aee7..909f871054a129 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -327,6 +327,7 @@ class DeleteBitmap { using Version = uint64_t; using BitmapKey = std::tuple; std::map delete_bitmap; // Ordered map + std::set delete_bitmap_ignore; constexpr static inline uint32_t INVALID_SEGMENT_ID = std::numeric_limits::max() - 1; constexpr static inline uint32_t ROWSET_SENTINEL_MARK = std::numeric_limits::max() - 1; @@ -371,6 +372,11 @@ class DeleteBitmap { */ void add(const BitmapKey& bmk, uint32_t row_id); + /** + * Marks the specific ignore deleted + */ + void add_ignore(const BitmapKey& bmk); + /** * Clears the deletetion mark specific row * @@ -429,6 +435,16 @@ class DeleteBitmap { void subset(const BitmapKey& start, const BitmapKey& end, DeleteBitmap* subset_delete_map) const; + /** + * Gets subset without ignore of delete_bitmap with given range [start, end) + * + * @parma start start + * @parma end end + * @parma subset_delete_map output param + */ + void subset_ignore(const BitmapKey& start, const BitmapKey& end, + DeleteBitmap* subset_delete_map) const; + /** * Merges the given segment delete bitmap into *this * diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index d2b16a907a0b3b..d703dd0c502339 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -775,6 +775,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _sort_col_num = schema.sort_col_num(); _compression_type = schema.compression_type(); _schema_version = schema.schema_version(); + _is_unique_key_ignore_mode = schema.is_unique_key_ignore_mode(); } void TabletSchema::copy_from(const TabletSchema& tablet_schema) { @@ -920,6 +921,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const { tablet_schema_pb->set_schema_version(_schema_version); tablet_schema_pb->set_compression_type(_compression_type); tablet_schema_pb->set_version_col_idx(_version_col_idx); + tablet_schema_pb->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode); } size_t TabletSchema::row_size() const { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 2fe6ea45581582..65308ffb157f82 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -356,6 +356,10 @@ class TabletSchema { } vectorized::Block create_block_by_cids(const std::vector& cids); + void set_is_unique_key_ignore_mode(bool is_unique_key_ignore_mode) { + _is_unique_key_ignore_mode = is_unique_key_ignore_mode; + } + bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; } private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); @@ -393,6 +397,7 @@ class TabletSchema { int64_t _mem_size = 0; bool _store_row_column = false; bool _skip_write_index_on_load = false; + bool _is_unique_key_ignore_mode = false; }; bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 2431a824d77bab..655f6dfc699f7d 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -160,6 +160,8 @@ separated by commas. 30. escape Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'". +29. ignore_mode: Ignore mode, only effective when the target table is a unique table with merge-on-write enabled. When insert ignore mode is enabled, for the inserted rows, if the key of the row does not exist in the table, the row will be inserted. If the key already exists in the table, the row will be discarded. When sequence columns exists in the target table, the ignore mode can't be enabled in stream load. + ### Example 1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md index 4346aede1a5924..7b6d13563042f6 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md @@ -35,7 +35,7 @@ INSERT The change statement is to complete the data insertion operation. ```sql -INSERT INTO table_name +INSERT [IGNORE] INTO table_name [ PARTITION (p1, ...) ] [ WITH LABEL label] [ (column [, ...]) ] @@ -44,7 +44,7 @@ INSERT INTO table_name ```` Parameters - +> IGNORE: insert ignore mode, only effective when the target table is a unique table with merge-on-write enabled. When insert ignore mode is enabled, for the inserted rows, if the key of the row does not exist in the table, the row will be inserted. If the key already exists in the table, the row will be discarded. When sequence column exists in the target table, the `insert ignore` statements are forbidden. > tablet_name: The destination table for importing data. Can be of the form `db_name.table_name` > > partitions: Specify the partitions to be imported, which must be partitions that exist in `table_name`. Multiple partition names are separated by commas diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 3149002f2a0b79..0c92cf2b02c1f5 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -156,6 +156,8 @@ curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_h 30. escape 转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。 +29. ignore_mode: ignore模式,仅当目标表为开启merge-on-write的unique表时有效。开启后,对于插入的行,如果该行的key在表中不存在,则插入该行数据。如果key在表中不存在,则丢弃这行数据。当目标表中存在sequence列时stream无法开启ignore mode。 + ### Example 1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md index d1355748fb35cf..327ef1b456ddf8 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md @@ -35,7 +35,7 @@ INSERT 该语句是完成数据插入操作。 ```sql -INSERT INTO table_name +INSERT [IGNORE] INTO table_name [ PARTITION (p1, ...) ] [ WITH LABEL label] [ (column [, ...]) ] @@ -44,7 +44,8 @@ INSERT INTO table_name ``` Parameters - +> IGNORE: insert ignore模式,仅当目标表为开启merge-on-write的unique表时有效。开启后,对于插入的行,如果该行的key在表中不存在,则插入该行数据。如果key在表中不存在,则丢弃这行数据。当目标表中存在sequence列时无法通过insert ignore语句进行插入操作。 +> > tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式 > > partitions: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔 diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ee65d926e85bc2..4cada3fc8872bf 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -898,7 +898,7 @@ nonterminal ParseNode load_property; nonterminal List opt_load_property_list; // Boolean -nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc; +nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc, opt_is_insert_ignore; nonterminal String opt_from_rollup, opt_to_rollup; nonterminal ColumnPosition opt_col_pos; @@ -3104,7 +3104,7 @@ opt_partition ::= | KW_AUTO KW_PARTITION KW_BY KW_RANGE function_name:functionName LPAREN expr_list:l COMMA KW_INTERVAL expr:v ident:u RPAREN LPAREN opt_all_partition_desc_list:list RPAREN {: - Expr fnExpr = FunctionCallExpr.functionWithIntervalConvert(functionName.getFunction().toLowerCase(), l.get(0), v, u); + Expr fnExpr = FunctionCallExpr.functionWithIntervalConvert(functionName.getFunction().toLowerCase(), l.get(0), v, u); ArrayList exprs = new ArrayList(); exprs.add(fnExpr); RESULT = RangePartitionDesc.createRangePartitionDesc(exprs, list); @@ -3590,6 +3590,17 @@ opt_is_auto_inc ::= RESULT = true; :} ; + +opt_is_insert_ignore ::= + {: + RESULT = false; + :} + | KW_IGNORE + {: + RESULT = true; + :} + ; + opt_comment ::= /* empty */ {: @@ -4685,16 +4696,16 @@ insert_overwrite_stmt ::= // Insert statement insert_stmt ::= - KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source + KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source {: - RESULT = new NativeInsertStmt(target, label, cols, source, hints); + RESULT = new NativeInsertStmt(target, label, cols, source, hints, is_insert_ignore); :} // TODO(zc) add default value for SQL-2003 // | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES | /* used for group commit */ - KW_INSERT KW_INTO KW_DORIS_INTERNAL_TABLE_ID LPAREN INTEGER_LITERAL:table_id RPAREN opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source + KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source {: - RESULT = new NativeInsertStmt(table_id, label, cols, source, hints); + RESULT = new NativeInsertStmt(table_id, label, cols, source, hints, is_insert_ignore); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java index 24c54ae8bcf7d6..2c2c4fecb57b35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java @@ -202,7 +202,8 @@ private void constructInsertStmt() throws AnalysisException { new InsertSource(selectStmt), null, isPartialUpdate, - NativeInsertStmt.InsertType.DELETE); + NativeInsertStmt.InsertType.DELETE, + false); ((NativeInsertStmt) insertStmt).setIsFromDeleteOrUpdateStmt(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index ef7acb484a1e86..e5ca4a0096471a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -160,6 +160,8 @@ public class NativeInsertStmt extends InsertStmt { private boolean isFromDeleteOrUpdateStmt = false; + private boolean isInsertIgnore = false; + private InsertType insertType = InsertType.NATIVE_INSERT; enum InsertType { @@ -192,6 +194,26 @@ public NativeInsertStmt(long tableId, String label, List cols, InsertSou this.tableId = tableId; } + public NativeInsertStmt(long tableId, String label, List cols, InsertSource source, + List hints, boolean isInsertIgnore) { + this(new InsertTarget(new TableName(null, null, null), null), label, cols, source, hints, isInsertIgnore); + this.tableId = tableId; + } + + public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, + List hints, boolean isInsertIgnore) { + super(new LabelName(null, label), null, null); + this.tblName = target.getTblName(); + this.targetPartitionNames = target.getPartitionNames(); + this.label = new LabelName(null, label); + this.queryStmt = source.getQueryStmt(); + this.planHints = hints; + this.isInsertIgnore = isInsertIgnore; + this.targetColumnNames = cols; + this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt + && ((SelectStmt) queryStmt).getTableRefs().isEmpty()); + } + // Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label, QueryStmt queryStmt, List planHints, List targetColumnNames) { @@ -206,11 +228,12 @@ public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, Lab } public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, - List hints, boolean isPartialUpdate, InsertType insertType) { + List hints, boolean isPartialUpdate, InsertType insertType, boolean isInsertIgnore) { this(target, label, cols, source, hints); this.isPartialUpdate = isPartialUpdate; this.partialUpdateCols.addAll(cols); this.insertType = insertType; + this.isInsertIgnore = isInsertIgnore; } public boolean isValuesOrConstantSelect() { @@ -399,7 +422,7 @@ public void analyze(Analyzer analyzer) throws UserException { boolean isInsertStrict = analyzer.getContext().getSessionVariable().getEnableInsertStrict() && !isFromDeleteOrUpdateStmt; sink.init(loadId, transactionId, db.getId(), timeoutSecond, - sendBatchParallelism, false, isInsertStrict); + sendBatchParallelism, false, isInsertStrict, isInsertIgnore); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java index c3afcca68a391c..830515ac9c1a3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -125,7 +125,8 @@ private void constructInsertStmt() { cols, new InsertSource(selectStmt), null, - isPartialUpdate, NativeInsertStmt.InsertType.UPDATE); + isPartialUpdate, NativeInsertStmt.InsertType.UPDATE, + false); ((NativeInsertStmt) insertStmt).setIsFromDeleteOrUpdateStmt(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index e2f3bdcbe9e251..af818eb223ca21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -205,7 +205,7 @@ public void plan(TUniqueId loadId, List> fileStatusesLis List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode, false); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 45411204524193..960efce2f43d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -185,7 +185,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { ctx.getExecTimeout(), ctx.getSessionVariable().getSendBatchParallelism(), false, - isStrictMode); + isStrictMode, + false); sink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState( diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index e2cc67d992a9c5..cfa992545a2044 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.Index; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; @@ -111,6 +112,8 @@ public class OlapTableSink extends DataSink { private boolean isStrictMode = false; + private boolean isUniqueKeyIgnoreMode = false; + public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, boolean singleReplicaLoad) { this.dstTable = dstTable; @@ -120,7 +123,7 @@ public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List hiddenColumns; private boolean trimDoubleQuotes = false; private boolean isPartialUpdate = false; + private boolean isIgnoreMode = false; private int skipLines = 0; private boolean enableProfile = false; @@ -319,6 +320,10 @@ public void setStreamPerNode(int streamPerNode) { this.streamPerNode = streamPerNode; } + public boolean isIgnoreMode() { + return isIgnoreMode; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType(), @@ -457,6 +462,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); } + if (request.isSetIgnoreMode()) { + isIgnoreMode = request.isIgnoreMode(); + } if (request.isSetStreamPerNode()) { this.streamPerNode = request.getStreamPerNode(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index 0ebea0e3b35c68..2b81266bf60b6f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -109,7 +109,7 @@ public void testSinglePartition() throws UserException { new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); dstTable.getPartitionInfo().setIsMutable(partition.getId(), true); OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -148,7 +148,7 @@ public void testRangePartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); try { sink.complete(null); } catch (UserException e) { @@ -173,7 +173,7 @@ public void testRangeUnknownPartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -212,7 +212,7 @@ public void testListPartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); try { sink.complete(null); } catch (UserException e) { diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 270199cc0204c7..e4374052f6dd4b 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -68,5 +68,6 @@ message POlapTableSchemaParam { optional bool partial_update = 7 [default = false]; repeated string partial_update_input_columns = 8; optional bool is_strict_mode = 9 [default = false]; + optional bool is_unique_key_ignore_mode = 10 [default = false]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 4c49e31a7f47d2..55656358e9b73c 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -254,6 +254,7 @@ message TabletSchemaPB { repeated string partial_update_input_columns = 21; // deprecated optional bool enable_single_replica_compaction = 22 [default=false]; optional bool skip_write_index_on_load = 23 [default=false]; + optional bool is_unique_key_ignore_mode = 24 [default=false]; } enum TabletStatePB { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index abaf8f8967daad..d750be902dfe85 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -234,6 +234,7 @@ struct TOlapTableSchemaParam { 8: optional bool is_partial_update 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false; + 11: optional bool is_unique_key_ignore_mode = false; } struct TTabletLocation { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f7c94b37e91320..819d5a04353250 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -642,6 +642,7 @@ struct TStreamLoadPutRequest { 53: optional bool memtable_on_sink_node; 54: optional bool group_commit 55: optional i32 stream_per_node; + 56: optional bool ignore_mode = false } struct TStreamLoadPutResult { diff --git a/regression-test/data/insert_p0/insert_ignore.out b/regression-test/data/insert_p0/insert_ignore.out new file mode 100644 index 00000000000000..bb38f60c9cc4be --- /dev/null +++ b/regression-test/data/insert_p0/insert_ignore.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !origin_data -- +1 kevin 18 shenzhen 400 +2 bob 20 beijing 500 +3 alice 22 shanghai 600 +4 jack 24 hangzhou 700 +5 tom 26 guanzhou 800 + +-- !after_insert_ignore -- +1 kevin 18 shenzhen 400 +2 bob 20 beijing 500 +3 alice 22 shanghai 600 +4 jack 24 hangzhou 700 +5 tom 26 guanzhou 800 +10 alex 28 shenzhen 1111 +20 leo 30 beijing 2222 +30 sam 32 shanghai 3333 +40 Ruth 34 hangzhou 4444 +50 cynthia 36 guanzhou 8000 + +-- !origin_data -- +1 1 + +-- !delete_a_row -- + +-- !after_insert_ignore -- +1 3 + +-- !sql -- +1 10 + +-- !delete_a_row -- + +-- !after_insert_ignore -- +1 1 + diff --git a/regression-test/data/unique_with_mow_p0/ignore_mode.csv b/regression-test/data/unique_with_mow_p0/ignore_mode.csv new file mode 100644 index 00000000000000..693c484172459c --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/ignore_mode.csv @@ -0,0 +1,10 @@ +1,"kevin",18,"shenzhen",4000 +10,"alex",28,"shenzhen",1111 +2,"bob",20,"beijing",5000 +20,"leo",30,"beijing",2222 +30,"sam",32,"shanghai",3333 +3,"alice",22,"shanghai",6000 +4,"jack",24,"hangzhou",7000 +40,"Ruth",34,"hangzhou",4444 +5,"tom",26,"guanzhou",8000 +50,"cynthia",36,"guanzhou",8000 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/ignore_mode2.csv b/regression-test/data/unique_with_mow_p0/ignore_mode2.csv new file mode 100644 index 00000000000000..5711f63304b74a --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/ignore_mode2.csv @@ -0,0 +1,2 @@ +1,4000 +2,5000 diff --git a/regression-test/data/unique_with_mow_p0/test_ignore_mode.out b/regression-test/data/unique_with_mow_p0/test_ignore_mode.out new file mode 100644 index 00000000000000..5c72f099d99f41 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/test_ignore_mode.out @@ -0,0 +1,20 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !origin_data -- +1 kevin 18 shenzhen 400 +2 bob 20 beijing 500 +3 alice 22 shanghai 600 +4 jack 24 hangzhou 700 +5 tom 26 guanzhou 800 + +-- !after_ignore_mode_stream_load -- +1 kevin 18 shenzhen 400 +2 bob 20 beijing 500 +3 alice 22 shanghai 600 +4 jack 24 hangzhou 700 +5 tom 26 guanzhou 800 +10 "alex" 28 "shenzhen" 1111 +20 "leo" 30 "beijing" 2222 +30 "sam" 32 "shanghai" 3333 +40 "Ruth" 34 "hangzhou" 4444 +50 "cynthia" 36 "guanzhou" 8000 + diff --git a/regression-test/suites/insert_p0/insert_ignore.groovy b/regression-test/suites/insert_p0/insert_ignore.groovy new file mode 100644 index 00000000000000..37cd7707f2e4eb --- /dev/null +++ b/regression-test/suites/insert_p0/insert_ignore.groovy @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_insert_ignore") { + + def tableName = "test_insert_ignore1" + sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(10) NULL, + `age` int(11) NULL DEFAULT "20", + `city` varchar(10) NOT NULL DEFAULT "beijing", + `balance` decimalv3(9, 0) NULL + ) ENGINE = OLAP UNIQUE KEY(`id`) + COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + sql """insert into ${tableName} values + (1,"kevin",18,"shenzhen",400), + (2,"bob",20,"beijing",500), + (3,"alice",22,"shanghai",600), + (4,"jack",24,"hangzhou",700), + (5,"tom",26,"guanzhou",800);""" + qt_origin_data "select * from ${tableName} order by id;" + + // some rows are with existing keys, some are not + sql """insert ignore into ${tableName} values + (1,"kevin",18,"shenzhen",4000), + (10,"alex",28,"shenzhen",1111), + (2,"bob",20,"beijing",5000), + (20,"leo",30,"beijing",2222), + (30,"sam",32,"shanghai",3333), + (3,"alice",22,"shanghai",6000), + (4,"jack",24,"hangzhou",7000), + (40,"Ruth",34,"hangzhou",4444), + (5,"tom",26,"guanzhou",8000), + (50,"cynthia",36,"guanzhou",8000);""" + + qt_after_insert_ignore "select * from ${tableName} order by id;" + sql """ DROP TABLE IF EXISTS ${tableName};""" + + def tableName2 = "test_insert_ignore2" + sql """ DROP TABLE IF EXISTS ${tableName2} FORCE; """ + sql """CREATE TABLE IF NOT EXISTS ${tableName2} ( + `uid` BIGINT NULL, + `v1` BIGINT NULL + ) + UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + );""" + + sql "insert into ${tableName2} values(1,1);" + qt_origin_data "select * from ${tableName2} order by uid;" + + sql "insert into ${tableName2}(uid, v1, __DORIS_DELETE_SIGN__) values(1, 2, 1);" + qt_delete_a_row "select * from ${tableName2} order by uid;" + + sql "insert ignore into ${tableName2} values(1,3);" + qt_after_insert_ignore "select * from ${tableName2} order by uid;" + + sql "insert into ${tableName2} values(1,10);" + qt_sql "select * from ${tableName2} order by uid;" + + sql "insert into ${tableName2}(uid, v1, __DORIS_DELETE_SIGN__) values(1, 1, 1);" + qt_delete_a_row "select * from ${tableName2} order by uid;" + + sql "insert ignore into ${tableName2} values(1,1);" + qt_after_insert_ignore "select * from ${tableName2} order by uid;" + sql """ DROP TABLE IF EXISTS ${tableName2}; """ + + + // test illigal cases + def tableName3 = "test_insert_ignore3" + sql """ DROP TABLE IF EXISTS ${tableName3} FORCE; """ + sql """CREATE TABLE IF NOT EXISTS ${tableName3} ( + `uid` BIGINT NULL, + `v1` BIGINT NULL + ) UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", + "replication_num" = "1" + );""" + sql "insert into ${tableName3} values(1,1);" + test { + sql "insert ignore into ${tableName3} values(1,3);" + exception "ignore mode can only be enabled if the target table is a unique table with merge-on-write enabled." + } + + def tableName4 = "test_insert_ignore4" + sql """ DROP TABLE IF EXISTS ${tableName4} FORCE; """ + sql """CREATE TABLE IF NOT EXISTS ${tableName4} ( + `uid` BIGINT NULL, + `v1` BIGINT NULL + ) UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1", + "function_column.sequence_col" = 'v1' + );""" + sql "insert into ${tableName4} values(1,1);" + test { + sql "insert ignore into ${tableName4} values(1,3);" + exception "ignore mode can't be used if the target table has sequence column, but table[${tableName4}] has sequnce column." + } +} diff --git a/regression-test/suites/unique_with_mow_p0/test_ignore_mode.groovy b/regression-test/suites/unique_with_mow_p0/test_ignore_mode.groovy new file mode 100644 index 00000000000000..94afd0be6b0715 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/test_ignore_mode.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_table_ignore_mode") { + + def tableName = "test_mow_table_ignore_mode1" + sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(10) NULL, + `age` int(11) NULL DEFAULT "20", + `city` varchar(10) NOT NULL DEFAULT "beijing", + `balance` decimalv3(9, 0) NULL + ) ENGINE = OLAP UNIQUE KEY(`id`) + COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + sql """insert into ${tableName} values + (1,"kevin",18,"shenzhen",400), + (2,"bob",20,"beijing",500), + (3,"alice",22,"shanghai",600), + (4,"jack",24,"hangzhou",700), + (5,"tom",26,"guanzhou",800);""" + qt_origin_data "select * from ${tableName} order by id;" + + // some rows are with existing keys, some are not + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'id,name,age,city,balance' + set 'ignore_mode', 'true' + + file 'ignore_mode.csv' + time 10000 // limit inflight 10s + } + sql "sync" + + qt_after_ignore_mode_stream_load "select * from ${tableName} order by id;" + sql """ DROP TABLE IF EXISTS ${tableName};""" + + + // test illegal case + def tableName2 = "test_mow_table_ignore_mode2" + sql """ DROP TABLE IF EXISTS ${tableName2} FORCE;""" + sql """ + CREATE TABLE ${tableName2} ( + `id` int(11) NULL, + `name` varchar(10) NULL, + `age` int(11) NULL DEFAULT "20", + `city` varchar(10) NOT NULL DEFAULT "beijing", + `balance` decimalv3(9, 0) NULL + ) ENGINE = OLAP UNIQUE KEY(`id`) + COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + );""" + sql """insert into ${tableName2} values + (1,"kevin",18,"shenzhen",400), + (2,"bob",20,"beijing",500), + (3,"alice",22,"shanghai",600), + (4,"jack",24,"hangzhou",700), + (5,"tom",26,"guanzhou",800);""" + // some rows are with existing keys, some are not + streamLoad { + table "${tableName2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'id,balance' + set 'partial_columns', 'true' + set 'ignore_mode', 'true' + + file 'ignore_mode.csv' + time 10000 // limit inflight 10s + + check {result, exception, startTime, endTime -> + assertTrue(exception == null) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("ignore mode can't be used in partial update.")) + } + } +} From c5b8139b507a3aafd274e35dc3efb51ba2e69da7 Mon Sep 17 00:00:00 2001 From: hugoluo Date: Wed, 22 Nov 2023 16:00:53 +0800 Subject: [PATCH 2/5] [Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773) --- be/src/olap/partial_update_info.h | 3 ++- be/src/olap/tablet.cpp | 14 +++++++++----- be/src/olap/tablet_meta.cpp | 2 +- be/src/olap/tablet_meta.h | 2 +- .../apache/doris/analysis/NativeInsertStmt.java | 3 ++- .../nereids/trees/plans/PartitionTopnPhase.java | 1 + .../apache/doris/planner/StreamLoadPlanner.java | 3 ++- 7 files changed, 18 insertions(+), 10 deletions(-) diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index d792bf6c5d7fae..3b3e37a9f8d1d3 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -23,7 +23,8 @@ namespace doris { struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, bool is_unique_key_ignore_mode) { + const std::set& partial_update_cols, bool is_strict_mode, + bool is_unique_key_ignore_mode) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; missing_cids.clear(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2a809a016dfef2..c26d5ed4d4672a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2963,7 +2963,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, if (st.is() || st.is()) { delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, row_id); - delete_bitmap->add_ignore({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}); + delete_bitmap->add_ignore( + {rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}); } } else { if (st.is() && (!is_partial_update || have_input_seq_column)) { @@ -2992,7 +2993,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, // consists of version 6's update columns and version 5's origin columns // here we build 2 read plan for ori values and update values prepare_to_read(loc, pos, &read_plan_ori); - prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); + prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, + &read_plan_update); rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; ++pos; // delete bitmap will be calculate when memtable flush and @@ -3011,8 +3013,9 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, continue; } // when st = ok - delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, - loc.row_id); + delete_bitmap->add( + {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, + loc.row_id); } } remaining -= num_read; @@ -3406,7 +3409,8 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( src.segment_id = seg_id; DeleteBitmap subset_map(tablet_id()); input_delete_bitmap.subset_ignore({rowset->rowset_id(), seg_id, start_version}, - {rowset->rowset_id(), seg_id, end_version}, &subset_map); + {rowset->rowset_id(), seg_id, end_version}, + &subset_map); // traverse all versions and convert rowid for (auto iter = subset_map.delete_bitmap.begin(); iter != subset_map.delete_bitmap.end(); ++iter) { diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index cd769c33d15df8..a7e24786a0d070 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1007,7 +1007,7 @@ void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end, } void DeleteBitmap::subset_ignore(const BitmapKey& start, const BitmapKey& end, - DeleteBitmap* subset_rowset_map) const { + DeleteBitmap* subset_rowset_map) const { roaring::Roaring roaring; DCHECK(start < end); std::shared_lock l(lock); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 909f871054a129..25f21915164c9d 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -443,7 +443,7 @@ class DeleteBitmap { * @parma subset_delete_map output param */ void subset_ignore(const BitmapKey& start, const BitmapKey& end, - DeleteBitmap* subset_delete_map) const; + DeleteBitmap* subset_delete_map) const; /** * Merges the given segment delete bitmap into *this diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index e5ca4a0096471a..b272e8cdeea9e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -228,7 +228,8 @@ public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, Lab } public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, - List hints, boolean isPartialUpdate, InsertType insertType, boolean isInsertIgnore) { + List hints, boolean isPartialUpdate, InsertType insertType, + boolean isInsertIgnore) { this(target, label, cols, source, hints); this.isPartialUpdate = isPartialUpdate; this.partialUpdateCols.addAll(cols); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java index 34520ae6c14c62..b726b2b3b49e20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java @@ -26,6 +26,7 @@ public enum PartitionTopnPhase { TWO_PHASE_LOCAL_PTOPN("TwoPhaseLocalPartitionTopn"), TWO_PHASE_GLOBAL_PTOPN("TwoPhaseGlobalPartitionTopn"); private final String name; + PartitionTopnPhase(String name) { this.name = name; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 7136eeb1dc4e95..670871f6f2f4b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -480,7 +480,8 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, - taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), taskInfo.isIgnoreMode()); + taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), + taskInfo.isIgnoreMode()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); From d2e2ce4ae1d0b5a736e2f9d7d0cc3ef9fac03428 Mon Sep 17 00:00:00 2001 From: hugoluo Date: Wed, 22 Nov 2023 16:03:34 +0800 Subject: [PATCH 3/5] [Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773) --- be/src/olap/tablet.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index c26d5ed4d4672a..973a7c1f827aed 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3409,8 +3409,8 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( src.segment_id = seg_id; DeleteBitmap subset_map(tablet_id()); input_delete_bitmap.subset_ignore({rowset->rowset_id(), seg_id, start_version}, - {rowset->rowset_id(), seg_id, end_version}, - &subset_map); + {rowset->rowset_id(), seg_id, end_version}, + &subset_map); // traverse all versions and convert rowid for (auto iter = subset_map.delete_bitmap.begin(); iter != subset_map.delete_bitmap.end(); ++iter) { From 7759ca40b6ae19d55be5807beec645c76de5104c Mon Sep 17 00:00:00 2001 From: hugoluo Date: Wed, 22 Nov 2023 16:05:14 +0800 Subject: [PATCH 4/5] [Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773) --- be/src/olap/rowset_builder.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 476180051b29a3..f8ec446a8ebb27 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -371,7 +371,6 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->is_unique_key_ignore_mode()); - } } // namespace doris From 680d47deff2c44ae1cae89d57378ae36ab21261c Mon Sep 17 00:00:00 2001 From: hugoluo Date: Wed, 22 Nov 2023 17:41:39 +0800 Subject: [PATCH 5/5] [Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773) --- .../java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index af818eb223ca21..17beadc4540468 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -205,7 +205,8 @@ public void plan(TUniqueId loadId, List> fileStatusesLis List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode, false); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode, + false); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer);