Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

namespace doris {

// TODO: this enum should be replaced by Status
enum AgentStatus {
DORIS_SUCCESS = 0,
DORIS_ERROR = -1,
Expand All @@ -41,7 +42,7 @@ enum AgentStatus {
DORIS_PUSH_HAD_LOADED = -504,
DORIS_TIMEOUT = -901,
DORIS_INTERNAL_ERROR = -902,
DORIS_DISK_REACH_CAPACITY_LIMIT = -903,
DORIS_DISK_REACH_CAPACITY_LIMIT = -903
};
} // namespace doris
#endif // DORIS_BE_SRC_AGENT_STATUS_H
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ namespace config {

// Maximum number of cache partitions corresponding to a SQL
CONF_Int32(query_cache_max_partition_count, "1024");

// Maximum number of version of a tablet. If the version num of a tablet exceed limit,
// the load process will reject new incoming load job of this tablet.
// This is to avoid too many version num.
CONF_mInt32(max_tablet_version_num, "500");

} // namespace config

Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ Status OlapScanner::get_batch(
auto res = _reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), batch->agg_object_pool(), eof);
if (res != OLAP_SUCCESS) {
std::stringstream ss;
ss << "Internal Error: read storage fail. res=" << res;
ss << "Internal Error: read storage fail. res=" << res
<< ", tablet=" << _tablet->full_name()
<< ", backend=" << BackendOptions::get_localhost();
return Status::InternalError(ss.str());
}
// If we reach end of this scanner, break
Expand Down
18 changes: 17 additions & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ Status NodeChannel::open_wait() {
LOG(WARNING) << name() << " add batch req success but status isn't ok, "
<< print_load_info() << ", node=" << node_info()->host << ":"
<< node_info()->brpc_port << ", errmsg=" << status.get_error_msg();
{
std::lock_guard<SpinLock> l(_cancel_msg_lock);
if (_cancel_msg == "") {
std::stringstream ss;
ss << "node=" << node_info()->host << ":" << node_info()->brpc_port << ", errmsg=" << status.get_error_msg();
_cancel_msg = ss.str();
}
}
}

if (result.has_execution_time_us()) {
Expand Down Expand Up @@ -261,7 +269,15 @@ Status NodeChannel::close_wait(RuntimeState* state) {
return Status::OK();
}

return Status::InternalError("close wait failed coz rpc error");
std::stringstream ss;
ss << "close wait failed coz rpc error";
{
std::lock_guard<SpinLock> l(_cancel_msg_lock);
if (_cancel_msg != "") {
ss << ". " << _cancel_msg;
}
}
return Status::InternalError(ss.str());
}

void NodeChannel::cancel() {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "util/ref_count_closure.h"
#include "util/thrift_util.h"
#include "util/countdown_latch.h"
#include "util/spinlock.h"
#include "util/thread.h"

namespace doris {
Expand Down Expand Up @@ -207,6 +208,8 @@ class NodeChannel {

// user cancel or get some errors
std::atomic<bool> _cancelled{false};
SpinLock _cancel_msg_lock;
std::string _cancel_msg = "";

// send finished means the consumer thread which send the rpc can exit
std::atomic<bool> _send_finished{false};
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ OLAPStatus DeltaWriter::init() {
return OLAP_ERR_TABLE_NOT_FOUND;
}

// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << _tablet->full_name();
return OLAP_ERR_TABLE_NOT_FOUND;
}

{
ReadLock base_migration_rlock(_tablet->get_migration_lock_ptr(), TRY_LOCK);
if (!base_migration_rlock.own_lock()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ enum OLAPStatus {
OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232,
OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234,
OLAP_ERR_TOO_MANY_VERSION = -235,

// CommandExecutor
// [-300, -400)
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
}
}

// check if version number exceed limit
if (tablet_vars->at(0).tablet->version_count() > config::max_tablet_version_num) {
LOG(WARNING) << "failed to push data. version count: " << tablet_vars->at(0).tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << tablet_vars->at(0).tablet->full_name();
return OLAP_ERR_TOO_MANY_VERSION;
}

// write
if (push_type == PUSH_NORMAL_V2) {
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ const RowCursor* ColumnData::seek_and_get_current_row(const RowBlockPosition& po
res = _get_block(true, 1);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "Fail to get block in seek_and_get_current_row, res=" << res
<< ", segment:" << position.segment << ", block:" << position.data_offset;
<< ", segment:" << position.segment << ", block:" << position.data_offset
<< ", tablet: " << _segment_group->get_tablet_id();
return nullptr;
}
return _current_row();
Expand Down
6 changes: 6 additions & 0 deletions docs/en/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ Indicates how many tablets in this data directory failed to load. At the same ti

### `max_tablet_num_per_shard`

### `max_tablet_version_num`

* Type: int
* Description: Limit the number of versions of a single tablet. It is used to prevent a large number of version accumulation problems caused by too frequent import or untimely compaction. When the limit is exceeded, the import task will be rejected.
* Default value: 500

### `mem_limit`

### `memory_limitation_per_thread_for_schema_change`
Expand Down
6 changes: 6 additions & 0 deletions docs/zh-CN/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@ load tablets from header failed, failed tablets size: xxx, path=xxx

### `max_tablet_num_per_shard`

### `max_tablet_version_num`

* 类型:int
* 描述:限制单个 tablet 最大 version 的数量。用于防止导入过于频繁,或 compaction 不及时导致的大量 version 堆积问题。当超过限制后,导入任务将被拒绝。
* 默认值:500

### `mem_limit`

### `memory_limitation_per_thread_for_schema_change`
Expand Down