Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_version = pschema.version();
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_is_unique_key_ignore_mode = pschema.is_unique_key_ignore_mode();

for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
Expand Down Expand Up @@ -176,9 +177,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_table_id = tschema.table_id;
_version = tschema.version;
_is_partial_update = tschema.is_partial_update;
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
_is_strict_mode = tschema.__isset.is_strict_mode && tschema.is_strict_mode;
_is_unique_key_ignore_mode =
tschema.__isset.is_unique_key_ignore_mode && tschema.is_unique_key_ignore_mode;

for (auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
Expand Down Expand Up @@ -246,6 +247,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
bool is_strict_mode() const { return _is_strict_mode; }
bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; }
std::string debug_string() const;

private:
Expand All @@ -104,6 +105,7 @@ class OlapTableSchemaParam {
bool _is_partial_update = false;
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
bool _is_unique_key_ignore_mode = false;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
7 changes: 7 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
}
if (!http_req->header(HTTP_IGNORE_MODE).empty()) {
if (iequal(http_req->header(HTTP_IGNORE_MODE), "true")) {
request.__set_ignore_mode(true);
} else {
request.__set_ignore_mode(false);
}
}

#ifndef BE_TEST
// plan this load
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_IGNORE_MODE = "ignore_mode";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
OlapStopWatch watch;
RETURN_IF_ERROR(_context.tablet->calc_delete_bitmap(
rowset, segments, specified_rowsets, _context.mow_context->delete_bitmap,
_context.mow_context->max_version, nullptr));
_context.mow_context->max_version, nullptr, nullptr));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
_tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
_tablet_schema->set_is_unique_key_ignore_mode(table_schema_param->is_unique_key_ignore_mode());
}

} // namespace doris
74 changes: 42 additions & 32 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
#include "olap/txn_manager.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/define_primitive_type.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
Expand Down Expand Up @@ -2884,6 +2885,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
bool is_partial_update = rowset_schema->is_partial_update();
bool is_unique_key_ignore_mode = rowset_schema->is_unique_key_ignore_mode();
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
Expand Down Expand Up @@ -2951,42 +2953,50 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
if (st.is<KEY_NOT_FOUND>()) {
continue;
}

// sequence id smaller than the previous one, so delete current row
if (st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
} else if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for concurrent update
// For example, if version 5 and 6 update a row, but version 6 only see
// version 4 when write, and when publish version, version 5's value will
// be marked as deleted and it's update is losed.
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
// publish. The two stages may see different versions.
// When there is sequence column, the currently imported data
// of rowset may be marked for deletion at memtablet flush or
// publish because the seq column is smaller than the previous
// rowset.
// just set 0 as a unified temporary version number, and update to
// the real version number later.
if (UNLIKELY(is_unique_key_ignore_mode)) {
if (st.is<OK>() || st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
}
} else {
// sequence id smaller than the previous one, so delete current row
if (st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
} else if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for concurrent update
// For example, if version 5 and 6 update a row, but version 6 only see
// version 4 when write, and when publish version, version 5's value will
// be marked as deleted and it's update is losed.
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos,
&read_plan_update);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
// publish. The two stages may see different versions.
// When there is sequence column, the currently imported data
// of rowset may be marked for deletion at memtablet flush or
// publish because the seq column is smaller than the previous
// rowset.
// just set 0 as a unified temporary version number, and update to
// the real version number later.
delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
// when st = ok
delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
// when st = ok
delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
remaining -= num_read;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_compression_type = schema.compression_type();
_schema_version = schema.schema_version();
_is_partial_update = schema.is_partial_update();
_is_unique_key_ignore_mode = schema.is_unique_key_ignore_mode();
for (auto& col_name : schema.partial_update_input_columns()) {
_partial_update_input_columns.emplace(col_name);
}
Expand Down Expand Up @@ -917,6 +918,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
tablet_schema_pb->set_is_partial_update(_is_partial_update);
tablet_schema_pb->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode);
for (auto& col : _partial_update_input_columns) {
*tablet_schema_pb->add_partial_update_input_columns() = col;
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,12 @@ class TabletSchema {
}
void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode = is_strict_mode; }
bool is_strict_mode() const { return _is_strict_mode; }
std::vector<uint32_t> get_missing_cids() const { return _missing_cids; }
std::vector<uint32_t> get_update_cids() const { return _update_cids; }
void set_is_unique_key_ignore_mode(bool is_unique_key_ignore_mode) {
_is_unique_key_ignore_mode = is_unique_key_ignore_mode;
}
bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; }
std::vector<uint32_t> get_missing_cids() { return _missing_cids; }
std::vector<uint32_t> get_update_cids() { return _update_cids; }

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down Expand Up @@ -411,6 +415,7 @@ class TabletSchema {
// to generate a new row, only available in non-strict mode
bool _can_insert_new_rows_in_partial_update = true;
bool _is_strict_mode = false;
bool _is_unique_key_ignore_mode = false;
};

bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ separated by commas.

28. comment: <version since="1.2.3" type="inline"> String type, the default value is "". </version>

29. ignore_mode: <version since="dev" type="inline"> Ignore mode, only effective when the target table is a unique table with merge-on-write enabled. When insert ignore mode is enabled, for the inserted rows, if the key of the row does not exist in the table, the row will be inserted. If the key already exists in the table, the row will be discarded. When sequence columns exists in the target table, the ignore mode can't be enabled in stream load.</version>

### Example

1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ INSERT
The change statement is to complete the data insertion operation.

```sql
INSERT INTO table_name
INSERT [IGNORE] INTO table_name
[ PARTITION (p1, ...) ]
[ WITH LABEL label]
[ (column [, ...]) ]
Expand All @@ -44,7 +44,7 @@ INSERT INTO table_name
````

Parameters

> IGNORE: insert ignore mode, only effective when the target table is a unique table with merge-on-write enabled. When insert ignore mode is enabled, for the inserted rows, if the key of the row does not exist in the table, the row will be inserted. If the key already exists in the table, the row will be discarded. When sequence column exists in the target table, the `insert ignore` statements are forbidden.
> tablet_name: The destination table for importing data. Can be of the form `db_name.table_name`
>
> partitions: Specify the partitions to be imported, which must be partitions that exist in `table_name`. Multiple partition names are separated by commas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_h

28. comment: <version since="1.2.3" type="inline"> 字符串类型, 默认值为空. 给任务增加额外的信息. </version>

29. ignore_mode: <version since="dev" type="inline"> ignore模式,仅当目标表为开启merge-on-write的unique表时有效。开启后,对于插入的行,如果该行的key在表中不存在,则插入该行数据。如果key在表中不存在,则丢弃这行数据。当目标表中存在sequence列时stream无法开启ignore mode。</version>

### Example

1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ INSERT
该语句是完成数据插入操作。

```sql
INSERT INTO table_name
INSERT [IGNORE] INTO table_name
[ PARTITION (p1, ...) ]
[ WITH LABEL label]
[ (column [, ...]) ]
Expand All @@ -44,7 +44,8 @@ INSERT INTO table_name
```

Parameters

> IGNORE: insert ignore模式,仅当目标表为开启merge-on-write的unique表时有效。开启后,对于插入的行,如果该行的key在表中不存在,则插入该行数据。如果key在表中不存在,则丢弃这行数据。当目标表中存在sequence列时无法通过insert ignore语句进行插入操作。
>
> tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
>
> partitions: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
Expand Down
21 changes: 16 additions & 5 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ nonterminal ParseNode load_property;
nonterminal List<ParseNode> opt_load_property_list;

// Boolean
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc;
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc, opt_is_insert_ignore;
nonterminal String opt_from_rollup, opt_to_rollup;
nonterminal ColumnPosition opt_col_pos;

Expand Down Expand Up @@ -3718,6 +3718,17 @@ opt_is_auto_inc ::=
RESULT = true;
:}
;

opt_is_insert_ignore ::=
{:
RESULT = false;
:}
| KW_IGNORE
{:
RESULT = true;
:}
;

opt_comment ::=
/* empty */
{:
Expand Down Expand Up @@ -4801,16 +4812,16 @@ insert_overwrite_stmt ::=

// Insert statement
insert_stmt ::=
KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
{:
RESULT = new NativeInsertStmt(target, label, cols, source, hints);
RESULT = new NativeInsertStmt(target, label, cols, source, hints, is_insert_ignore);
:}
// TODO(zc) add default value for SQL-2003
// | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES
| /* used for group commit */
KW_INSERT KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
{:
RESULT = new NativeInsertStmt(table_id, label, cols, source, hints);
RESULT = new NativeInsertStmt(table_id, label, cols, source, hints, is_insert_ignore);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private void constructInsertStmt() throws AnalysisException {
cols,
new InsertSource(selectStmt),
null,
isPartialUpdate);
isPartialUpdate,
false);
}

private void analyzeTargetTable(Analyzer analyzer) throws UserException {
Expand Down
Loading