Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ namespace ErrorCode {
E(KEY_NOT_FOUND, -7000, false); \
E(KEY_ALREADY_EXISTS, -7001, false); \
E(ENTRY_NOT_FOUND, -7002, false); \
E(NEW_ROWS_IN_PARTIAL_UPDATE, -7003, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false);
Expand Down
27 changes: 27 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
}
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
if (_is_partial_update) {
if (pschema.has_partial_update_new_key_policy()) {
_partial_update_new_row_policy = pschema.partial_update_new_key_policy();
}
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
Expand Down Expand Up @@ -221,6 +226,27 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
}

if (_is_partial_update) {
if (tschema.__isset.partial_update_new_key_policy) {
switch (tschema.partial_update_new_key_policy) {
case doris::TPartialUpdateNewRowPolicy::APPEND: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND;
break;
}
case doris::TPartialUpdateNewRowPolicy::ERROR: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR;
break;
}
default: {
return Status::InvalidArgument(
"Unknown partial_update_new_key_behavior: {}, should be one of "
"'APPEND' or 'ERROR'",
tschema.partial_update_new_key_policy);
}
}
}
}

for (const auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
}
Expand Down Expand Up @@ -305,6 +331,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_table_id(_table_id);
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class OlapTableSchemaParam {
std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
}
PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const {
return _partial_update_new_row_policy;
}
std::string auto_increment_coulumn() const { return _auto_increment_column; }
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
Expand All @@ -113,6 +116,8 @@ class OlapTableSchemaParam {
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
bool _is_partial_update = false;
PartialUpdateNewRowPolicyPB _partial_update_new_row_policy {
PartialUpdateNewRowPolicyPB::APPEND};
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
Expand Down
20 changes: 19 additions & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,25 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_partial_update(false);
}
}

if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) {
static const std::map<std::string, TPartialUpdateNewRowPolicy::type> policy_map {
{"APPEND", TPartialUpdateNewRowPolicy::APPEND},
{"ERROR", TPartialUpdateNewRowPolicy::ERROR}};

auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY);
std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(),
[](unsigned char c) { return std::toupper(c); });
auto it = policy_map.find(policy_name);
if (it == policy_map.end()) {
return Status::InvalidArgument(
"Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', "
"'ERROR'}",
policy_name);
}
request.__set_partial_update_new_key_policy(it->second);
}

if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
Expand Down Expand Up @@ -705,7 +724,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
ctx->put_result.params.__set_content_length(content_length);
}

VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
// if we not use streaming, we must download total content before we begin
// to process this load
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY = "partial_update_new_key_behavior";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
_partial_update_info = std::make_shared<PartialUpdateInfo>();
RETURN_IF_ERROR(_partial_update_info->init(
_req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_new_key_policy(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
Expand Down
49 changes: 33 additions & 16 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
namespace doris {

Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
bool partial_update, const std::set<string>& partial_update_cols,
bool partial_update, PartialUpdateNewRowPolicyPB policy,
const std::set<std::string>& partial_update_cols,
bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_new_key_policy = policy;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
Expand Down Expand Up @@ -86,6 +88,7 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc

void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
partial_update_info_pb->set_is_partial_update(is_partial_update);
partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
for (const auto& col : partial_update_input_columns) {
partial_update_info_pb->add_partial_update_input_columns(col);
Expand Down Expand Up @@ -113,6 +116,9 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const

void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
is_partial_update = partial_update_info_pb->is_partial_update();
if (partial_update_info_pb->has_partial_update_new_key_policy()) {
partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
}
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
? partial_update_info_pb->max_version_in_flush_phase()
: -1;
Expand Down Expand Up @@ -152,23 +158,34 @@ std::string PartialUpdateInfo::summary() const {
update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase);
}

Status PartialUpdateInfo::handle_non_strict_mode_not_found_error(
const TabletSchema& tablet_schema) {
if (!can_insert_new_rows_in_partial_update) {
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
const std::function<std::string()>& line) {
switch (partial_update_new_key_policy) {
case doris::PartialUpdateNewRowPolicyPB::APPEND: {
if (is_partial_update) {
if (!can_insert_new_rows_in_partial_update) {
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for newly inserted rows in non-strict mode partial update",
error_column);
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for "
"newly inserted rows in non-strict mode partial update",
error_column);
} break;
case doris::PartialUpdateNewRowPolicyPB::ERROR: {
return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
"Can't append new rows in partial update when partial_update_new_key_behavior is "
"ERROR. Row with key=[{}] is not in table.",
line());
} break;
}
return Status::OK();
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once
#include <cstdint>
#include <functional>
#include <map>
#include <set>
#include <string>
Expand All @@ -38,20 +39,23 @@ struct RowsetId;

struct PartialUpdateInfo {
Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
bool partial_update, const std::set<std::string>& partial_update_cols,
bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, const std::string& auto_increment_column,
bool partial_update, PartialUpdateNewRowPolicyPB policy,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema);
Status handle_new_key(const TabletSchema& tablet_schema,
const std::function<std::string()>& line);
std::string summary() const;

private:
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);

public:
bool is_partial_update {false};
PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND};
int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
Expand Down
28 changes: 13 additions & 15 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,24 +489,16 @@ Status SegmentWriter::probe_key_for_mow(
PartialUpdateReadPlan& read_plan, const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats) {
const std::function<Status()>& not_found_cb, PartialUpdateStats& stats) {
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++stats.num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else if (!have_delete_sign) {
RETURN_IF_ERROR(
_opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema));
if (!have_delete_sign) {
RETURN_IF_ERROR(not_found_cb());
}
++stats.num_rows_new_added;
has_default_or_nullable = true;
Expand Down Expand Up @@ -678,10 +670,16 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
bool have_delete_sign =
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);

RETURN_IF_ERROR(probe_key_for_mow(key, segment_pos, have_input_seq_column, have_delete_sign,
read_plan, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag,
stats));
auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_new_key(
*_tablet_schema, [&]() -> std::string {
return block->dump_one_line(block_pos, _num_sort_key_columns);
});
};
RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column,
have_delete_sign, read_plan, specified_rowsets,
segment_caches, has_default_or_nullable,
use_default_or_null_flag, not_found_cb, stats));
}
CHECK_EQ(use_default_or_null_flag.size(), num_rows);

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class SegmentWriter {
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable,
std::vector<bool>& use_default_or_null_flag,
const std::function<Status()>& not_found_cb,
PartialUpdateStats& stats);
Status partial_update_preconditions_check(size_t row_pos);
Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos,
Expand Down
24 changes: 11 additions & 13 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,24 +359,16 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
PartialUpdateReadPlan& read_plan, const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats) {
const std::function<Status()>& not_found_cb, PartialUpdateStats& stats) {
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++stats.num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else if (!have_delete_sign) {
RETURN_IF_ERROR(
_opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema));
if (!have_delete_sign) {
RETURN_IF_ERROR(not_found_cb());
}
++stats.num_rows_new_added;
has_default_or_nullable = true;
Expand Down Expand Up @@ -539,10 +531,16 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
bool have_delete_sign =
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);

RETURN_IF_ERROR(_probe_key_for_mow(key, segment_pos, have_input_seq_column,
auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_new_key(
*_tablet_schema, [&]() -> std::string {
return data.block->dump_one_line(block_pos, _num_sort_key_columns);
});
};
RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column,
have_delete_sign, read_plan, specified_rowsets,
segment_caches, has_default_or_nullable,
use_default_or_null_flag, stats));
use_default_or_null_flag, not_found_cb, stats));
}
CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class VerticalSegmentWriter {
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable,
std::vector<bool>& use_default_or_null_flag,
const std::function<Status()>& not_found_cb,
PartialUpdateStats& stats);
Status _partial_update_preconditions_check(size_t row_pos);
Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
RETURN_IF_ERROR(_partial_update_info->init(
tablet()->tablet_id(), _req.txn_id, *_tablet_schema,
table_schema_param->is_partial_update(),
table_schema_param->partial_update_new_key_policy(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
Expand Down
Loading
Loading