diff --git a/be/src/agent/status.h b/be/src/agent/status.h index f31b5b09c8eee3..e5174aed8daa21 100644 --- a/be/src/agent/status.h +++ b/be/src/agent/status.h @@ -20,6 +20,7 @@ namespace doris { +// TODO: this enum should be replaced by Status enum AgentStatus { DORIS_SUCCESS = 0, DORIS_ERROR = -1, @@ -41,7 +42,7 @@ enum AgentStatus { DORIS_PUSH_HAD_LOADED = -504, DORIS_TIMEOUT = -901, DORIS_INTERNAL_ERROR = -902, - DORIS_DISK_REACH_CAPACITY_LIMIT = -903, + DORIS_DISK_REACH_CAPACITY_LIMIT = -903 }; } // namespace doris #endif // DORIS_BE_SRC_AGENT_STATUS_H diff --git a/be/src/common/config.h b/be/src/common/config.h index 6d245d99503201..c763811a0fc2db 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -585,6 +585,11 @@ namespace config { // Maximum number of cache partitions corresponding to a SQL CONF_Int32(query_cache_max_partition_count, "1024"); + + // Maximum number of version of a tablet. If the version num of a tablet exceed limit, + // the load process will reject new incoming load job of this tablet. + // This is to avoid too many version num. + CONF_mInt32(max_tablet_version_num, "500"); } // namespace config diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 50d2652f7cb996..bdf2c2fa060cff 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -268,7 +268,9 @@ Status OlapScanner::get_batch( auto res = _reader->next_row_with_aggregation(&_read_row_cursor, mem_pool.get(), batch->agg_object_pool(), eof); if (res != OLAP_SUCCESS) { std::stringstream ss; - ss << "Internal Error: read storage fail. res=" << res; + ss << "Internal Error: read storage fail. res=" << res + << ", tablet=" << _tablet->full_name() + << ", backend=" << BackendOptions::get_localhost(); return Status::InternalError(ss.str()); } // If we reach end of this scanner, break diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index cd27ad7968a8e2..7ef6b58abae762 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -163,6 +163,14 @@ Status NodeChannel::open_wait() { LOG(WARNING) << name() << " add batch req success but status isn't ok, " << print_load_info() << ", node=" << node_info()->host << ":" << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + { + std::lock_guard l(_cancel_msg_lock); + if (_cancel_msg == "") { + std::stringstream ss; + ss << "node=" << node_info()->host << ":" << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + _cancel_msg = ss.str(); + } + } } if (result.has_execution_time_us()) { @@ -261,7 +269,15 @@ Status NodeChannel::close_wait(RuntimeState* state) { return Status::OK(); } - return Status::InternalError("close wait failed coz rpc error"); + std::stringstream ss; + ss << "close wait failed coz rpc error"; + { + std::lock_guard l(_cancel_msg_lock); + if (_cancel_msg != "") { + ss << ". " << _cancel_msg; + } + } + return Status::InternalError(ss.str()); } void NodeChannel::cancel() { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 9143f9799cedd3..e41c4827cd8d52 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -35,6 +35,7 @@ #include "util/ref_count_closure.h" #include "util/thrift_util.h" #include "util/countdown_latch.h" +#include "util/spinlock.h" #include "util/thread.h" namespace doris { @@ -207,6 +208,8 @@ class NodeChannel { // user cancel or get some errors std::atomic _cancelled{false}; + SpinLock _cancel_msg_lock; + std::string _cancel_msg = ""; // send finished means the consumer thread which send the rpc can exit std::atomic _send_finished{false}; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b24a3410a5db3e..9d42e6e1607da6 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -101,6 +101,14 @@ OLAPStatus DeltaWriter::init() { return OLAP_ERR_TABLE_NOT_FOUND; } + // check tablet version number + if (_tablet->version_count() > config::max_tablet_version_num) { + LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count() + << ", exceed limit: " << config::max_tablet_version_num + << ". tablet: " << _tablet->full_name(); + return OLAP_ERR_TABLE_NOT_FOUND; + } + { ReadLock base_migration_rlock(_tablet->get_migration_lock_ptr(), TRY_LOCK); if (!base_migration_rlock.own_lock()) { diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 1820117fd45a4a..cde102ca7afec2 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -164,6 +164,7 @@ enum OLAPStatus { OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232, OLAP_ERR_TOO_MANY_TRANSACTIONS = -233, OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234, + OLAP_ERR_TOO_MANY_VERSION = -235, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index cfd221decda710..07801d217c6824 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -187,6 +187,14 @@ OLAPStatus PushHandler::_do_streaming_ingestion( } } + // check if version number exceed limit + if (tablet_vars->at(0).tablet->version_count() > config::max_tablet_version_num) { + LOG(WARNING) << "failed to push data. version count: " << tablet_vars->at(0).tablet->version_count() + << ", exceed limit: " << config::max_tablet_version_num + << ". tablet: " << tablet_vars->at(0).tablet->full_name(); + return OLAP_ERR_TOO_MANY_VERSION; + } + // write if (push_type == PUSH_NORMAL_V2) { res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, diff --git a/be/src/olap/rowset/column_data.cpp b/be/src/olap/rowset/column_data.cpp index b1b15cba5ae6f1..7fec759e9ff1e5 100644 --- a/be/src/olap/rowset/column_data.cpp +++ b/be/src/olap/rowset/column_data.cpp @@ -345,7 +345,8 @@ const RowCursor* ColumnData::seek_and_get_current_row(const RowBlockPosition& po res = _get_block(true, 1); if (res != OLAP_SUCCESS) { LOG(WARNING) << "Fail to get block in seek_and_get_current_row, res=" << res - << ", segment:" << position.segment << ", block:" << position.data_offset; + << ", segment:" << position.segment << ", block:" << position.data_offset + << ", tablet: " << _segment_group->get_tablet_id(); return nullptr; } return _current_row(); diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index a2c2af6f658bb6..37f46b4b512c31 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -410,6 +410,12 @@ Indicates how many tablets in this data directory failed to load. At the same ti ### `max_tablet_num_per_shard` +### `max_tablet_version_num` + +* Type: int +* Description: Limit the number of versions of a single tablet. It is used to prevent a large number of version accumulation problems caused by too frequent import or untimely compaction. When the limit is exceeded, the import task will be rejected. +* Default value: 500 + ### `mem_limit` ### `memory_limitation_per_thread_for_schema_change` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index dd90c2fd854a9e..22bab78864d83c 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -409,6 +409,12 @@ load tablets from header failed, failed tablets size: xxx, path=xxx ### `max_tablet_num_per_shard` +### `max_tablet_version_num` + +* 类型:int +* 描述:限制单个 tablet 最大 version 的数量。用于防止导入过于频繁,或 compaction 不及时导致的大量 version 堆积问题。当超过限制后,导入任务将被拒绝。 +* 默认值:500 + ### `mem_limit` ### `memory_limitation_per_thread_for_schema_change`