Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,13 @@ CONF_String(function_service_protocol, "h2:grpc");
// use which load balancer to select server to connect
CONF_String(rpc_load_balancer, "rr");

// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
// so we set a soft limit, default is 1MB
CONF_mInt32(string_type_length_soft_limit_bytes, "1048576");

CONF_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });

} // namespace config

} // namespace doris
Expand Down
55 changes: 33 additions & 22 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int
if (_parent->_transfer_data_by_brpc_attachment) {
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
_node_channel_tracker = MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str());
_node_channel_tracker =
MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str());
}

NodeChannel::~NodeChannel() noexcept {
Expand Down Expand Up @@ -193,7 +194,8 @@ Status NodeChannel::open_wait() {
return;
}
// If rpc failed, mark all tablets on this node channel as failed
_index_channel->mark_as_failed(this->node_id(), this->host(), _add_batch_closure->cntl.ErrorText(), -1);
_index_channel->mark_as_failed(this->node_id(), this->host(),
_add_batch_closure->cntl.ErrorText(), -1);
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
Expand All @@ -216,7 +218,8 @@ Status NodeChannel::open_wait() {
if (status.ok()) {
// if has error tablet, handle them first
for (auto& error : result.tablet_errors()) {
_index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), error.tablet_id());
_index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(),
error.tablet_id());
}

Status st = _index_channel->check_intolerable_failure();
Expand Down Expand Up @@ -262,8 +265,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
while (!_cancelled &&
_pending_batches_num > 0 &&
while (!_cancelled && _pending_batches_num > 0 &&
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
Expand Down Expand Up @@ -314,8 +316,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
while (!_cancelled &&
_pending_batches_num > 0 &&
while (!_cancelled && _pending_batches_num > 0 &&
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
Expand Down Expand Up @@ -393,7 +394,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
while (!_add_batches_finished && !_cancelled) {
SleepFor(MonoDelta::FromMilliseconds(1));
}
_close_time_ms = UnixMillis() - _close_time_ms;
_close_time_ms = UnixMillis() - _close_time_ms;

if (_add_batches_finished) {
{
Expand Down Expand Up @@ -685,7 +686,8 @@ OlapTableSink::~OlapTableSink() {
// OlapTableSink::_mem_tracker and its parents.
// But their destructions are after OlapTableSink's.
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->clear_all_batches(); });
index_channel->for_each_node_channel(
[](const std::shared_ptr<NodeChannel>& ch) { ch->clear_all_batches(); });
}
}

Expand Down Expand Up @@ -846,11 +848,13 @@ Status OlapTableSink::open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));

for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->open(); });
index_channel->for_each_node_channel(
[](const std::shared_ptr<NodeChannel>& ch) { ch->open(); });
}

for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&index_channel](const std::shared_ptr<NodeChannel>& ch) {
index_channel->for_each_node_channel([&index_channel](
const std::shared_ptr<NodeChannel>& ch) {
auto st = ch->open_wait();
if (!st.ok()) {
// The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel.
Expand Down Expand Up @@ -940,13 +944,13 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
uint32_t tablet_index = 0;
if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
if (_partition_to_tablet_map.find(partition->id) == _partition_to_tablet_map.end()) {
tablet_index = _partition->find_tablet(tuple,*partition);
tablet_index = _partition->find_tablet(tuple, *partition);
_partition_to_tablet_map.emplace(partition->id, tablet_index);
} else {
tablet_index = _partition_to_tablet_map[partition->id];
}
} else {
tablet_index = _partition->find_tablet(tuple,*partition);
tablet_index = _partition->find_tablet(tuple, *partition);
}
_partition_ids.emplace(partition->id);
for (int j = 0; j < partition->indexes.size(); ++j) {
Expand Down Expand Up @@ -984,7 +988,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->mark_close(); });
index_channel->for_each_node_channel(
[](const std::shared_ptr<NodeChannel>& ch) { ch->mark_close(); });
num_node_channels += index_channel->num_node_channels();
}

Expand All @@ -997,7 +1002,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
&total_add_batch_num](const std::shared_ptr<NodeChannel>& ch) {
auto s = ch->close_wait(state);
if (!s.ok()) {
index_channel->mark_as_failed(ch->node_id(), ch->host(), s.get_error_msg(), -1);
index_channel->mark_as_failed(ch->node_id(), ch->host(),
s.get_error_msg(), -1);
LOG(WARNING)
<< ch->channel_info()
<< ", close channel failed, err: " << s.get_error_msg();
Expand Down Expand Up @@ -1047,7 +1053,8 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
// print log of add batch time of all node, for tracing load performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/wait execution time(ms)/close time(ms)/num: ";
<< ", txn_id=" << _txn_id
<< ", node add batch time(ms)/wait execution time(ms)/close time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
<< ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")("
Expand All @@ -1056,8 +1063,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
LOG(INFO) << ss.str();
} else {
for (auto channel : _channels) {
channel->for_each_node_channel(
[&status](const std::shared_ptr<NodeChannel>& ch) { ch->cancel(status.get_error_msg()); });
channel->for_each_node_channel([&status](const std::shared_ptr<NodeChannel>& ch) {
ch->cancel(status.get_error_msg());
});
}
}

Expand Down Expand Up @@ -1189,13 +1197,14 @@ Status OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitma
}
case TYPE_STRING: {
StringValue* str_val = (StringValue*)slot;
if (str_val->len > OLAP_STRING_MAX_LENGTH) {
if (str_val->len > config::string_type_length_soft_limit_bytes) {
fmt::format_to(error_msg, "{}",
"the length of input is too long than schema. ");
fmt::format_to(error_msg, "column_name: {}; ", desc->col_name());
fmt::format_to(error_msg, "first 128 bytes of input str: [{}] ",
std::string(str_val->ptr, 128));
fmt::format_to(error_msg, "schema length: {}; ", OLAP_STRING_MAX_LENGTH);
fmt::format_to(error_msg, "schema length: {}; ",
config::string_type_length_soft_limit_bytes);
fmt::format_to(error_msg, "actual length: {}; ", str_val->len);
row_valid = false;
continue;
Expand Down Expand Up @@ -1259,15 +1268,17 @@ void OlapTableSink::_send_batch_process(RuntimeState* state) {
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num, this, state](const std::shared_ptr<NodeChannel>& ch) {
index_channel->for_each_node_channel([&running_channels_num, this,
state](const std::shared_ptr<NodeChannel>& ch) {
running_channels_num +=
ch->try_send_and_fetch_status(state, this->_send_batch_thread_pool_token);
});
}

if (running_channels_num == 0) {
LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), "
"sender thread exit. " << print_id(_load_id);
"sender thread exit. "
<< print_id(_load_id);
return;
}
} while (!_stop_background_threads_latch.wait_for(
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool DeleteConditionHandler::is_condition_value_valid(const TabletColumn& column
case OLAP_FIELD_TYPE_VARCHAR:
return value_str.size() <= column.length();
case OLAP_FIELD_TYPE_STRING:
return value_str.size() <= OLAP_STRING_MAX_LENGTH;
return value_str.size() <= config::string_type_length_soft_limit_bytes;
case OLAP_FIELD_TYPE_DATE:
case OLAP_FIELD_TYPE_DATETIME:
return valid_datetime(value_str);
Expand Down Expand Up @@ -303,7 +303,7 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
}

bool DeleteHandler::is_filter_data(const int64_t data_version, const RowCursor& row) const {
// According to semantics, the delete condition stored in _del_conds should be an OR relationship,
// According to semantics, the delete condition stored in _del_conds should be an OR relationship,
// so as long as the data matches one of the _del_conds, it will return true.
for (const auto& del_cond : _del_conds) {
if (data_version <= del_cond.filter_version &&
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535;
// the max length supported for string type 2GB
static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647;

// the max length supported for vec string type 1MB
static constexpr size_t MAX_SIZE_OF_VEC_STRING = 1024 * 1024;

// the max length supported for array
static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535;

Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/row_block2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/row_block2.h"

#include <algorithm>
#include <sstream>
#include <utility>

Expand Down Expand Up @@ -190,16 +191,16 @@ Status RowBlockV2::_copy_data_to_column(int cid,
}
case OLAP_FIELD_TYPE_STRING: {
auto column_string = assert_cast<vectorized::ColumnString*>(column);

size_t limit = config::string_type_length_soft_limit_bytes;
for (uint16_t j = 0; j < _selected_size; ++j) {
if (!nullable_mark_array[j]) {
uint16_t row_idx = _selection_vector[j];
auto slice = reinterpret_cast<const Slice*>(column_block(cid).cell_ptr(row_idx));
if (LIKELY(slice->size <= MAX_SIZE_OF_VEC_STRING)) {
if (LIKELY(slice->size <= limit)) {
column_string->insert_data(slice->data, slice->size);
} else {
return Status::NotSupported(
"Not support string len over than 1MB in vec engine.");
return Status::NotSupported(fmt::format(
"Not support string len over than {} in vec engine.", limit));
}
} else {
column_string->insert_default();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1137,9 +1137,9 @@ template <>
struct FieldTypeTraits<OLAP_FIELD_TYPE_STRING> : public FieldTypeTraits<OLAP_FIELD_TYPE_CHAR> {
static OLAPStatus from_string(void* buf, const std::string& scan_key) {
size_t value_len = scan_key.length();
if (value_len > OLAP_STRING_MAX_LENGTH) {
if (value_len > config::string_type_length_soft_limit_bytes) {
LOG(WARNING) << "the len of value string is too long, len=" << value_len
<< ", max_len=" << OLAP_STRING_MAX_LENGTH;
<< ", max_len=" << config::string_type_length_soft_limit_bytes;
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/wrapper_field.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) {
(column.type() == OLAP_FIELD_TYPE_CHAR || column.type() == OLAP_FIELD_TYPE_VARCHAR ||
column.type() == OLAP_FIELD_TYPE_HLL || column.type() == OLAP_FIELD_TYPE_OBJECT ||
column.type() == OLAP_FIELD_TYPE_STRING);
size_t max_length = column.type() == OLAP_FIELD_TYPE_STRING ? OLAP_STRING_MAX_LENGTH
: OLAP_VARCHAR_MAX_LENGTH;
size_t max_length = column.type() == OLAP_FIELD_TYPE_STRING
? config::string_type_length_soft_limit_bytes
: OLAP_VARCHAR_MAX_LENGTH;
if (is_string_type && len > max_length) {
OLAP_LOG_WARNING("length of string parameter is too long[len=%lu, max_len=%lu].", len,
max_length);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct TypeDescriptor {
/// Only set if type == TYPE_CHAR or type == TYPE_VARCHAR
int len;
static const int MAX_VARCHAR_LENGTH = OLAP_VARCHAR_MAX_LENGTH;
static const int MAX_STRING_LENGTH = OLAP_STRING_MAX_LENGTH;
static const int MAX_CHAR_LENGTH = 255;
static const int MAX_CHAR_INLINE_LENGTH = 128;

Expand Down Expand Up @@ -100,7 +99,7 @@ struct TypeDescriptor {
static TypeDescriptor create_string_type() {
TypeDescriptor ret;
ret.type = TYPE_STRING;
ret.len = MAX_STRING_LENGTH;
ret.len = config::string_type_length_soft_limit_bytes;
return ret;
}

Expand Down
29 changes: 14 additions & 15 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple=[]");
return fmt::to_string(buf);
}, &stop_processing));
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple=[]");
return fmt::to_string(buf);
},
&stop_processing));
_number_filtered_rows++;
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
Expand Down Expand Up @@ -163,9 +164,11 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl
const auto num_rows = block->rows();
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
filter_bitmap->Set(row, true);
return state->append_error_msg_to_file([]() -> std::string { return ""; },
[&error_msg]() -> std::string { return fmt::to_string(error_msg); }, stop_processing);
filter_bitmap->Set(row, true);
return state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&error_msg]() -> std::string { return fmt::to_string(error_msg); },
stop_processing);
};

for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
Expand Down Expand Up @@ -200,12 +203,8 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl
const auto column_string =
assert_cast<const vectorized::ColumnString*>(real_column_ptr.get());

size_t limit = MAX_SIZE_OF_VEC_STRING;
if (desc->type().type != TYPE_STRING) {
DCHECK(desc->type().len >= 0);
limit = std::min(limit, (size_t)desc->type().len);
}

size_t limit =
std::min(config::string_type_length_soft_limit_bytes, desc->type().len);
for (int j = 0; j < num_rows; ++j) {
if (!filter_bitmap->Get(j)) {
auto str_val = column_string->get_data_at(j);
Expand All @@ -219,14 +218,14 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl
fmt::format_to(error_msg, "input str: [{}] ", str_val.to_prefix(10));
fmt::format_to(error_msg, "schema length: {}; ", desc->type().len);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
} else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
} else if (str_val.size > limit) {
fmt::format_to(
error_msg, "{}",
"the length of input string is too long than vec schema. ");
fmt::format_to(error_msg, "column_name: {}; ", desc->col_name());
fmt::format_to(error_msg, "input str: [{}] ", str_val.to_prefix(10));
fmt::format_to(error_msg, "schema length: {}; ", desc->type().len);
fmt::format_to(error_msg, "limit length: {}; ", MAX_SIZE_OF_VEC_STRING);
fmt::format_to(error_msg, "limit length: {}; ", limit);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
}

Expand Down
5 changes: 5 additions & 0 deletions docs/en/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1519,3 +1519,8 @@ Translated with www.DeepL.com/Translator (free version)
* Type: int32
* Description: The maximum amount of data read by each OlapScanner.
* Default: 1024

### `string_type_length_soft_limit_bytes`
* Type: int32
* Description: A soft limit of string type length.
* Description: 1048576
Loading