Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_version = pschema.version();
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_is_unique_key_ignore_mode = pschema.is_unique_key_ignore_mode();

for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
Expand Down Expand Up @@ -183,9 +184,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_table_id = tschema.table_id;
_version = tschema.version;
_is_partial_update = tschema.is_partial_update;
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
_is_strict_mode = tschema.__isset.is_strict_mode && tschema.is_strict_mode;
_is_unique_key_ignore_mode =
tschema.__isset.is_unique_key_ignore_mode && tschema.is_unique_key_ignore_mode;

for (auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
Expand Down Expand Up @@ -255,6 +256,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
bool is_strict_mode() const { return _is_strict_mode; }
bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; }
std::string debug_string() const;

private:
Expand All @@ -107,6 +108,7 @@ class OlapTableSchemaParam {
bool _is_partial_update = false;
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
bool _is_unique_key_ignore_mode = false;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
7 changes: 7 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
}
if (!http_req->header(HTTP_IGNORE_MODE).empty()) {
if (iequal(http_req->header(HTTP_IGNORE_MODE), "true")) {
request.__set_ignore_mode(true);
} else {
request.__set_ignore_mode(false);
}
}
request.__set_group_commit(ctx->group_commit);

#ifndef BE_TEST
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_IGNORE_MODE = "ignore_mode";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
table_schema_param->is_strict_mode(),
table_schema_param->is_unique_key_ignore_mode());
_tablet_schema->set_is_unique_key_ignore_mode(table_schema_param->is_unique_key_ignore_mode());
}

} // namespace doris
5 changes: 4 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace doris {

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode) {
const std::set<string>& partial_update_cols, bool is_strict_mode,
bool is_unique_key_ignore_mode) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
missing_cids.clear();
Expand All @@ -40,6 +41,7 @@ struct PartialUpdateInfo {
}
}
this->is_strict_mode = is_strict_mode;
this->is_unique_key_ignore_mode = is_unique_key_ignore_mode;
}

bool is_partial_update {false};
Expand All @@ -50,5 +52,6 @@ struct PartialUpdateInfo {
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
bool is_unique_key_ignore_mode {false};
};
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
table_schema_param->is_strict_mode(),
table_schema_param->is_unique_key_ignore_mode());
}

} // namespace doris
105 changes: 59 additions & 46 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
#include "olap/txn_manager.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/define_primitive_type.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
Expand Down Expand Up @@ -2881,6 +2882,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
bool is_partial_update = rowset_writer && rowset_writer->is_partial_update();
bool is_unique_key_ignore_mode = rowset_schema->is_unique_key_ignore_mode();
bool have_input_seq_column = false;
if (is_partial_update && rowset_schema->has_sequence_col()) {
std::vector<uint32_t> including_cids =
Expand Down Expand Up @@ -2957,54 +2959,64 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
if (st.is<KEY_NOT_FOUND>()) {
continue;
}

if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update || have_input_seq_column)) {
// `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row with the same key and larger value
// in seqeunce column.
// - If the current load is not a partial update, we just delete current row.
// - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts
// during concurrent partial updates. And there exists another load which introduces a row with
// the same keys and larger sequence column value published successfully after the commit phase
// of the current load.
// - If the columns we update include sequence column, we should delete the current row becase the
// partial update on the current row has been `overwritten` by the previous one with larger sequence
// column value.
// - Otherwise, we should combine the values of the missing columns in the previous row and the values
// of the including columns in the current row into a new row.
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for concurrent update
// For example, if version 5 and 6 update a row, but version 6 only see
// version 4 when write, and when publish version, version 5's value will
// be marked as deleted and it's update is losed.
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
// publish. The two stages may see different versions.
// When there is sequence column, the currently imported data
// of rowset may be marked for deletion at memtablet flush or
// publish because the seq column is smaller than the previous
// rowset.
// just set 0 as a unified temporary version number, and update to
// the real version number later.
if (UNLIKELY(is_unique_key_ignore_mode)) {
if (st.is<OK>() || st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
delete_bitmap->add_ignore(
{rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON});
}
} else {
if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update || have_input_seq_column)) {
// `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row with the same key and larger value
// in seqeunce column.
// - If the current load is not a partial update, we just delete current row.
// - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts
// during concurrent partial updates. And there exists another load which introduces a row with
// the same keys and larger sequence column value published successfully after the commit phase
// of the current load.
// - If the columns we update include sequence column, we should delete the current row becase the
// partial update on the current row has been `overwritten` by the previous one with larger sequence
// column value.
// - Otherwise, we should combine the values of the missing columns in the previous row and the values
// of the including columns in the current row into a new row.
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for concurrent update
// For example, if version 5 and 6 update a row, but version 6 only see
// version 4 when write, and when publish version, version 5's value will
// be marked as deleted and it's update is losed.
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos,
&read_plan_update);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
// publish. The two stages may see different versions.
// When there is sequence column, the currently imported data
// of rowset may be marked for deletion at memtablet flush or
// publish because the seq column is smaller than the previous
// rowset.
// just set 0 as a unified temporary version number, and update to
// the real version number later.
delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
// when st = ok
delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
// when st = ok
delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
remaining -= num_read;
}
Expand Down Expand Up @@ -3396,8 +3408,9 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap(
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
src.segment_id = seg_id;
DeleteBitmap subset_map(tablet_id());
input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version},
{rowset->rowset_id(), seg_id, end_version}, &subset_map);
input_delete_bitmap.subset_ignore({rowset->rowset_id(), seg_id, start_version},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't resolve the compaction issue, since the compaction use merge-on-read process to dedup keys, it can't identify which keys should be ignored.

{rowset->rowset_id(), seg_id, end_version},
&subset_map);
// traverse all versions and convert rowid
for (auto iter = subset_map.delete_bitmap.begin();
iter != subset_map.delete_bitmap.end(); ++iter) {
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,11 @@ void DeleteBitmap::add(const BitmapKey& bmk, uint32_t row_id) {
delete_bitmap[bmk].add(row_id);
}

void DeleteBitmap::add_ignore(const BitmapKey& bmk) {
std::lock_guard l(lock);
delete_bitmap_ignore.insert(bmk);
}

int DeleteBitmap::remove(const BitmapKey& bmk, uint32_t row_id) {
std::lock_guard l(lock);
auto it = delete_bitmap.find(bmk);
Expand Down Expand Up @@ -1001,6 +1006,23 @@ void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end,
}
}

void DeleteBitmap::subset_ignore(const BitmapKey& start, const BitmapKey& end,
DeleteBitmap* subset_rowset_map) const {
Comment on lines +1009 to +1010
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'subset_ignore' can be made static [readability-convert-member-functions-to-static]

Suggested change
void DeleteBitmap::subset_ignore(const BitmapKey& start, const BitmapKey& end,
DeleteBitmap* subset_rowset_map) const {
static void DeleteBitmap::subset_ignore(const BitmapKey& start, const BitmapKey& end,
DeleteBitmap* subset_rowset_map) {

roaring::Roaring roaring;
DCHECK(start < end);
std::shared_lock l(lock);
for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
auto& [k, bm] = *it;
if (k >= end) {
break;
}
if (delete_bitmap_ignore.find(k) == delete_bitmap_ignore.end()) {
break;
}
subset_rowset_map->set(k, bm);
}
}

void DeleteBitmap::merge(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) {
std::lock_guard l(lock);
auto [iter, succ] = delete_bitmap.emplace(bmk, segment_delete_bitmap);
Expand Down
16 changes: 16 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class DeleteBitmap {
using Version = uint64_t;
using BitmapKey = std::tuple<RowsetId, SegmentId, Version>;
std::map<BitmapKey, roaring::Roaring> delete_bitmap; // Ordered map
std::set<BitmapKey> delete_bitmap_ignore;
constexpr static inline uint32_t INVALID_SEGMENT_ID = std::numeric_limits<uint32_t>::max() - 1;
constexpr static inline uint32_t ROWSET_SENTINEL_MARK =
std::numeric_limits<uint32_t>::max() - 1;
Expand Down Expand Up @@ -371,6 +372,11 @@ class DeleteBitmap {
*/
void add(const BitmapKey& bmk, uint32_t row_id);

/**
* Marks the specific ignore deleted
*/
void add_ignore(const BitmapKey& bmk);

/**
* Clears the deletetion mark specific row
*
Expand Down Expand Up @@ -429,6 +435,16 @@ class DeleteBitmap {
void subset(const BitmapKey& start, const BitmapKey& end,
DeleteBitmap* subset_delete_map) const;

/**
* Gets subset without ignore of delete_bitmap with given range [start, end)
*
* @parma start start
* @parma end end
* @parma subset_delete_map output param
*/
void subset_ignore(const BitmapKey& start, const BitmapKey& end,
DeleteBitmap* subset_delete_map) const;

/**
* Merges the given segment delete bitmap into *this
*
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
_schema_version = schema.schema_version();
_is_unique_key_ignore_mode = schema.is_unique_key_ignore_mode();
}

void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
Expand Down Expand Up @@ -920,6 +921,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_schema_version(_schema_version);
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
tablet_schema_pb->set_is_unique_key_ignore_mode(_is_unique_key_ignore_mode);
}

size_t TabletSchema::row_size() const {
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ class TabletSchema {
}

vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
void set_is_unique_key_ignore_mode(bool is_unique_key_ignore_mode) {
_is_unique_key_ignore_mode = is_unique_key_ignore_mode;
}
bool is_unique_key_ignore_mode() const { return _is_unique_key_ignore_mode; }

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down Expand Up @@ -393,6 +397,7 @@ class TabletSchema {
int64_t _mem_size = 0;
bool _store_row_column = false;
bool _skip_write_index_on_load = false;
bool _is_unique_key_ignore_mode = false;
};

bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ separated by commas.

30. escape <version since="dev" type="inline"> Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'". </version>

29. ignore_mode: <version since="dev" type="inline"> Ignore mode, only effective when the target table is a unique table with merge-on-write enabled. When insert ignore mode is enabled, for the inserted rows, if the key of the row does not exist in the table, the row will be inserted. If the key already exists in the table, the row will be discarded. When sequence columns exists in the target table, the ignore mode can't be enabled in stream load.</version>

### Example

1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds
Expand Down
Loading