Skip to content
Closed

test #41839

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

#include <algorithm>
Expand Down Expand Up @@ -117,9 +118,21 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_db_id = pschema.db_id();
_table_id = pschema.table_id();
_version = pschema.version();
_is_partial_update = pschema.partial_update();
if (pschema.has_unique_key_update_mode()) {
_unique_key_update_mode = pschema.unique_key_update_mode();
if (pschema.has_sequence_map_col_unique_id()) {
_sequence_map_col_uid = pschema.sequence_map_col_unique_id();
}
} else {
// for backward compatibility
if (pschema.has_partial_update() && pschema.partial_update()) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
_is_strict_mode = pschema.is_strict_mode();
if (_is_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = pschema.auto_increment_column();
if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -155,7 +168,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (const auto& pcolumn_desc : p_index.columns_desc()) {
if (!_is_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(pcolumn_desc.name())) {
auto it = slots_map.find(std::make_pair(
to_lower(pcolumn_desc.name()),
Expand Down Expand Up @@ -185,15 +198,51 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
return Status::OK();
}

Status OlapTableSchemaParam::init_unique_key_update_mode(const TOlapTableSchemaParam& tschema) {
if (tschema.__isset.unique_key_update_mode) {
switch (tschema.unique_key_update_mode) {
case doris::TUniqueKeyUpdateMode::UPSERT: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
break;
}
default: {
return Status::InternalError(
"Unknown unique_key_update_mode: {}, should be one of "
"UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS",
tschema.unique_key_update_mode);
}
}
if (tschema.__isset.sequence_map_col_unique_id) {
_sequence_map_col_uid = tschema.sequence_map_col_unique_id;
}
} else {
// for backward compatibility
if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
return Status::OK();
}

Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
_is_partial_update = tschema.is_partial_update;
RETURN_IF_ERROR(init_unique_key_update_mode(tschema));
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
if (_is_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = tschema.auto_increment_column;
if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -221,7 +270,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (const auto& tcolumn_desc : t_index.columns_desc) {
if (!_is_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(tcolumn_desc.column_name)) {
auto it = slots_map.find(
std::make_pair(to_lower(tcolumn_desc.column_name),
Expand Down Expand Up @@ -270,13 +319,18 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_db_id(_db_id);
pschema->set_table_id(_table_id);
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_unique_key_update_mode(_unique_key_update_mode);
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
// for backward compatibility
pschema->set_partial_update(true);
}
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
pschema->set_sequence_map_col_unique_id(_sequence_map_col_uid);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
20 changes: 18 additions & 2 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -88,7 +89,18 @@ class OlapTableSchemaParam {
return _proto_schema;
}

bool is_partial_update() const { return _is_partial_update; }
UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; }

bool is_partial_update() const {
return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT;
}
bool is_fixed_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
}
bool is_flexible_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}

std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
}
Expand All @@ -101,8 +113,11 @@ class OlapTableSchemaParam {
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
int32_t sequence_map_col_uid() const { return _sequence_map_col_uid; }
std::string debug_string() const;

Status init_unique_key_update_mode(const TOlapTableSchemaParam& tschema);

private:
int64_t _db_id;
int64_t _table_id;
Expand All @@ -112,14 +127,15 @@ class OlapTableSchemaParam {
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
bool _is_partial_update = false;
UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT};
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
int32_t _sequence_map_col_uid {-1};
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
68 changes: 65 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,75 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_enable_profile(false);
}
}
if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {

if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
{"UPSERT", TUniqueKeyUpdateMode::UPSERT},
{"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
{"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
if (iter != unique_key_update_mode_map.end()) {
TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
// check constraints when flexible partial update is enabled
if (ctx->format != TFileFormatType::FORMAT_JSON) {
return Status::InvalidArgument(
"flexible partial update only support json format as input file "
"currently");
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'fuzzy_parse' is enabled");
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'columns' is specified");
}
if (!http_req->header(HTTP_JSONPATHS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'jsonpaths' is specified");
}
if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'hidden_columns' is "
"specified");
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'function_column.sequence_col' is specified");
}
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'merge_type' is specified");
}
if (!http_req->header(HTTP_WHERE).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'where' is specified");
}
}
request.__set_unique_key_update_mode(unique_key_update_mode);
} else {
return Status::InvalidArgument(
"Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
"'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
unique_key_update_mode_str);
}
}
if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
// only consider `partial_columns` parameter when `unique_key_update_mode` is not set
if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
// for backward compatibility
request.__set_partial_update(true);
} else {
request.__set_partial_update(false);
}
}

if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
_partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(),
Expand Down
Loading