diff --git a/be/src/agent/status.h b/be/src/agent/status.h index 14fe4c91aeb986..f31b5b09c8eee3 100644 --- a/be/src/agent/status.h +++ b/be/src/agent/status.h @@ -41,6 +41,7 @@ enum AgentStatus { DORIS_PUSH_HAD_LOADED = -504, DORIS_TIMEOUT = -901, DORIS_INTERNAL_ERROR = -902, + DORIS_DISK_REACH_CAPACITY_LIMIT = -903, }; } // namespace doris #endif // DORIS_BE_SRC_AGENT_STATUS_H diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ba4d7677c65d78..3bba1abcd03903 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -433,8 +433,8 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { vector error_msgs; TStatus task_status; - OLAPStatus create_status = - worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req); + std::vector finish_tablet_infos; + OLAPStatus create_status = worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req); if (create_status != OLAPStatus::OLAP_SUCCESS) { OLAP_LOG_WARNING("create table failed. status: %d, signature: %ld", create_status, agent_task_req.signature); @@ -442,12 +442,26 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { status_code = TStatusCode::RUNTIME_ERROR; } else { ++_s_report_version; + // get path hash of the created tablet + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash); + DCHECK(tablet != nullptr); + TTabletInfo tablet_info; + tablet_info.tablet_id = tablet->table_id(); + tablet_info.schema_hash = tablet->schema_hash(); + tablet_info.version = create_tablet_req.version; + tablet_info.version_hash = create_tablet_req.version_hash; + tablet_info.row_count = 0; + tablet_info.data_size = 0; + tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); + finish_tablet_infos.push_back(tablet_info); } task_status.__set_status_code(status_code); task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; + finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); finish_task_request.__set_backend(worker_pool_this->_backend); finish_task_request.__set_report_version(_s_report_version); finish_task_request.__set_task_type(agent_task_req.task_type); @@ -1252,7 +1266,7 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } #endif vector data_dir_infos; - worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos); + worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */); map disks; for (auto& root_path_info : data_dir_infos) { diff --git a/be/src/common/config.h b/be/src/common/config.h index 54072d901837f5..c5ffeff6a9be06 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -212,12 +212,11 @@ namespace config { // inc_rowset expired interval CONF_Int32(inc_rowset_expired_sec, "1800"); // garbage sweep policy - CONF_Int32(max_garbage_sweep_interval, "14400"); + CONF_Int32(max_garbage_sweep_interval, "3600"); CONF_Int32(min_garbage_sweep_interval, "180"); CONF_Int32(snapshot_expire_time_sec, "172800"); // 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数 CONF_Int32(trash_file_expire_time_sec, "259200"); - CONF_Int32(disk_capacity_insufficient_percentage, "90"); // check row nums for BE/CE and schema change. true is open, false is closed. CONF_Bool(row_nums_check, "true") //file descriptors cache, by default, cache 30720 descriptors @@ -439,6 +438,13 @@ namespace config { CONF_Int32(path_gc_check_step, "1000"); CONF_Int32(path_gc_check_step_interval_ms, "10"); CONF_Int32(path_scan_interval_second, "86400"); + + // The following 2 configs limit the max usage of disk capacity of a data dir. + // If both of these 2 threshold reached, no more data can be writen into that data dir. + // The percent of max used capacity of a data dir + CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95% + // The min bytes that should be left of a data dir + CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB } // namespace config } // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index ebe77de40dc37a..40d80d62ccbc55 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -32,6 +32,7 @@ namespace doris { +class DataDir; class Merger; // This class is a base class for compaction. diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index ec94e13cb09451..c186c5927c5730 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -59,14 +59,14 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, TabletManager* tablet_manager, TxnManager* txn_manager) : _path(path), _capacity_bytes(capacity_bytes), + _available_bytes(0), + _disk_capacity_bytes(0), + _is_used(false), _tablet_manager(tablet_manager), _txn_manager(txn_manager), _cluster_id(-1), - _available_bytes(0), - _used_bytes(0), - _current_shard(0), - _is_used(false), _to_be_deleted(false), + _current_shard(0), _test_file_read_buf(nullptr), _test_file_write_buf(nullptr), _meta(nullptr) { @@ -100,6 +100,7 @@ Status DataDir::init() { return Status::InternalError("invalid root path: "); } + RETURN_IF_ERROR(update_capacity()); RETURN_IF_ERROR(_init_cluster_id()); RETURN_IF_ERROR(_init_extension_and_capacity()); RETURN_IF_ERROR(_init_file_system()); @@ -1057,4 +1058,35 @@ void DataDir::_remove_check_paths_no_lock(const std::set& paths) { } } +Status DataDir::update_capacity() { + try { + boost::filesystem::path path_name(_path); + boost::filesystem::space_info path_info = boost::filesystem::space(path_name); + _available_bytes = path_info.available; + if (_disk_capacity_bytes == 0) { + // disk capacity only need to be set once + _disk_capacity_bytes = path_info.capacity; + } + } catch (boost::filesystem::filesystem_error& e) { + LOG(WARNING) << "get space info failed. path: " << _path << " erro:" << e.what(); + return Status::InternalError("get path available capacity failed"); + } + LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes + << ", available capacity: " << _available_bytes; + + return Status::OK(); +} + +bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { + double used_pct = (_available_bytes + incoming_data_size) / (double) _disk_capacity_bytes; + int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size; + + if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 + && left_bytes <= config::storage_flood_stage_left_capacity_bytes) { + LOG(WARNING) << "reach capacity limit. used pct: " << used_pct << ", left bytes: " << left_bytes + << ", path: " << _path; + return true; + } + return false; +} } // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 44528279cf19ce..e5b9a914f34f69 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -48,12 +48,15 @@ class DataDir { bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } int32_t cluster_id() const { return _cluster_id; } + DataDirInfo get_dir_info() { DataDirInfo info; info.path = _path; info.path_hash = _path_hash; - info.is_used = _is_used; info.capacity = _capacity_bytes; + info.available = _available_bytes; + info.is_used = _is_used; + info.storage_medium = _storage_medium; return info; } @@ -121,6 +124,16 @@ class DataDir { OLAPStatus set_convert_finished(); + // check if the capacity reach the limit after adding the incoming data + // return true if limit reached, otherwise, return false. + // TODO(cmy): for now we can not precisely calculate the capacity Doris used, + // so in order to avoid running out of disk capacity, we currenty use the actual + // disk avaiable capacity and total capacity to do the calculation. + // So that the capacity Doris actually used may exceeds the user specified capacity. + bool reach_capacity_limit(int64_t incoming_data_size); + + Status update_capacity(); + private: std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; } Status _init_cluster_id(); @@ -146,23 +159,30 @@ class DataDir { private: std::string _path; - size_t _path_hash; + int64_t _path_hash; + // user specified capacity + int64_t _capacity_bytes; + // the actual avaiable capacity of the disk of this data dir + // NOTICE that _available_byte smay be larger than _capacity_bytes, if capacity is set + // by user, not the disk's actual capacity + int64_t _available_bytes; + // the actual capacity of the disk of this data dir + int64_t _disk_capacity_bytes; + TStorageMedium::type _storage_medium; + bool _is_used; + uint32_t _rand_seed; std::string _file_system; - int64_t _capacity_bytes; TabletManager* _tablet_manager; TxnManager* _txn_manager; int32_t _cluster_id; - int64_t _available_bytes; - int64_t _used_bytes; - uint64_t _current_shard; - bool _is_used; // This flag will be set true if this store was not in root path when reloading bool _to_be_deleted; + // used to protect _current_shard and _tablet_set std::mutex _mutex; - TStorageMedium::type _storage_medium; // 存储介质类型:SSD|HDD + uint64_t _current_shard; std::set _tablet_set; static const size_t TEST_FILE_BUF_SIZE = 4096; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d3f63478c4e967..35c4d18fc49691 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -137,10 +137,7 @@ OLAPStatus DeltaWriter::init() { // TODO: new RowsetBuilder according to tablet storage type _rowset_writer.reset(new AlphaRowsetWriter()); - status = _rowset_writer->init(writer_context); - if (status != OLAP_SUCCESS) { - return OLAP_ERR_ROWSET_WRITER_INIT; - } + RETURN_NOT_OK(_rowset_writer->init(writer_context)); const std::vector& slots = _req.tuple_desc->slots(); const TabletSchema& schema = _tablet->tablet_schema(); diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 8de070442a4ba1..f3575ed547d50c 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -164,6 +164,7 @@ enum OLAPStatus { OLAP_ERR_TRANSACTION_ALREADY_VISIBLE = -229, OLAP_ERR_VERSION_ALREADY_MERGED = -230, OLAP_ERR_LZO_DISABLED = -231, + OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index b637e1d077d430..71cdb0346d57cd 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -157,7 +157,9 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_base_compaction(data_dir); + if (!data_dir->reach_capacity_limit(0)) { + perform_base_compaction(data_dir); + } usleep(interval * 1000000); } @@ -249,7 +251,9 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_cumulative_compaction(data_dir); + if (!data_dir->reach_capacity_limit(0)) { + perform_cumulative_compaction(data_dir); + } usleep(interval * 1000000); } diff --git a/be/src/olap/rowset/column_data_writer.cpp b/be/src/olap/rowset/column_data_writer.cpp index c8da480e974b43..7037f87241cb1b 100644 --- a/be/src/olap/rowset/column_data_writer.cpp +++ b/be/src/olap/rowset/column_data_writer.cpp @@ -280,7 +280,7 @@ OLAPStatus ColumnDataWriter::_flush_segment_with_verfication() { OLAPStatus res = _finalize_segment(); if (OLAP_SUCCESS != res) { OLAP_LOG_WARNING("fail to finalize segment. [res=%d]", res); - return OLAP_ERR_WRITER_DATA_WRITE_ERROR; + return res; } _new_segment_created = false; @@ -292,12 +292,12 @@ OLAPStatus ColumnDataWriter::_finalize_segment() { OLAPStatus res = OLAP_SUCCESS; uint32_t data_segment_size; - if (OLAP_SUCCESS != _segment_writer->finalize(&data_segment_size)) { + if ((res = _segment_writer->finalize(&data_segment_size)) != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to finish segment from olap_data."); - return OLAP_ERR_WRITER_DATA_WRITE_ERROR; + return res; } - if (OLAP_SUCCESS != _segment_group->finalize_segment(data_segment_size, _num_rows)) { + if ((res != _segment_group->finalize_segment(data_segment_size, _num_rows)) != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to finish segment from olap_index."); return OLAP_ERR_WRITER_INDEX_WRITE_ERROR; } diff --git a/be/src/olap/rowset/segment_writer.cpp b/be/src/olap/rowset/segment_writer.cpp index 09a77ecd367c19..70ffccb53f9e94 100644 --- a/be/src/olap/rowset/segment_writer.cpp +++ b/be/src/olap/rowset/segment_writer.cpp @@ -213,12 +213,6 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) { boost::filesystem::path data_dir_path = tablet_path.parent_path().parent_path().parent_path().parent_path(); std::string data_dir_string = data_dir_path.string(); DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_string); - data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id())); - if (OLAP_SUCCESS != (res = file_handle.open_with_mode( - _file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) { - LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]"; - return res; - } res = _make_file_header(file_header.mutable_message()); if (OLAP_SUCCESS != res) { @@ -226,6 +220,18 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) { return res; } + // check disk capacity + if (data_dir->reach_capacity_limit((int64_t) file_header.file_length())) { + return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + } + + data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id())); + if (OLAP_SUCCESS != (res = file_handle.open_with_mode( + _file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) { + LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]"; + return res; + } + res = file_header.prepare(&file_handle); if (OLAP_SUCCESS != res) { OLAP_LOG_WARNING("write file header error. [err=%m]"); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 340e74940f934e..6e7f7bb263fbbb 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1892,7 +1892,6 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // set status for monitor // 只要有一个new_table为running,ref table就设置为running // NOTE 如果第一个sub_table先fail,这里会继续按正常走 - RowsetId rowset_id = 0; TabletSharedPtr new_tablet = sc_params.new_tablet; res = sc_params.new_tablet->next_rowset_id(&rowset_id); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 8fff00dcb9597d..caba4108028c17 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -287,7 +287,7 @@ std::vector StorageEngine::get_stores() { template std::vector StorageEngine::get_stores(); template std::vector StorageEngine::get_stores(); -OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_infos) { +OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_infos, bool need_update) { OLAPStatus res = OLAP_SUCCESS; data_dir_infos->clear(); @@ -295,28 +295,21 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in timer.start(); int tablet_counter = 0; + // 1. update avaiable capacity of each data dir // get all root path info and construct a path map. // path -> DataDirInfo std::map path_map; { std::lock_guard l(_store_lock); for (auto& it : _store_map) { - std::string path = it.first; - path_map.emplace(path, it.second->get_dir_info()); - // if this path is not used, init it's info - if (!path_map[path].is_used) { - path_map[path].capacity = 1; - path_map[path].data_used_capacity = 0; - path_map[path].available = 0; - path_map[path].storage_medium = TStorageMedium::HDD; - } else { - path_map[path].storage_medium = it.second->storage_medium(); + if (need_update) { + it.second->update_capacity(); } + path_map.emplace(it.first, it.second->get_dir_info()); } } - // for each tablet, get it's data size, and accumulate the path 'data_used_capacity' - // which the tablet belongs to. + // 2. get total tablets' size of each data dir _tablet_manager->update_root_path_info(&path_map, &tablet_counter); // add path info to data_dir_infos @@ -324,12 +317,6 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in data_dir_infos->emplace_back(entry.second); } - // get available capacity of each path - for (auto& info: *data_dir_infos) { - if (info.is_used) { - _get_path_available_capacity(info.path, &info.available); - } - } timer.stop(); LOG(INFO) << "get root path info cost: " << timer.elapsed_time() / 1000000 << " ms. tablet counter: " << tablet_counter; @@ -429,7 +416,7 @@ std::vector StorageEngine::get_stores_for_create_tablet( } DataDir* StorageEngine::get_store(const std::string& path) { - std::lock_guard l(_store_lock); + // _store_map is unchanged, no need to lock auto it = _store_map.find(path); if (it == std::end(_store_map)) { return nullptr; @@ -470,23 +457,6 @@ void StorageEngine::_delete_tablets_on_unused_root_path() { _tablet_manager->drop_tablets_on_error_root_path(tablet_info_vec); } -OLAPStatus StorageEngine::_get_path_available_capacity( - const string& root_path, - int64_t* disk_available) { - OLAPStatus res = OLAP_SUCCESS; - - try { - boost::filesystem::path path_name(root_path); - boost::filesystem::space_info path_info = boost::filesystem::space(path_name); - *disk_available = path_info.available; - } catch (boost::filesystem::filesystem_error& e) { - LOG(WARNING) << "get space info failed. path: " << root_path << " erro:" << e.what(); - return OLAP_ERR_STL_ERROR; - } - - return res; -} - OLAPStatus StorageEngine::clear() { // 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的 delete FileHandler::get_fd_cache(); @@ -597,9 +567,9 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { const int32_t snapshot_expire = config::snapshot_expire_time_sec; const int32_t trash_expire = config::trash_file_expire_time_sec; - const double guard_space = config::disk_capacity_insufficient_percentage / 100.0; + const double guard_space = config::storage_flood_stage_usage_percent / 100.0; std::vector data_dir_infos; - res = get_all_data_dir_info(&data_dir_infos); + res = get_all_data_dir_info(&data_dir_infos, false); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to get root path stat info when sweep trash."; return res; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 1332065b95391a..dad4ce34529655 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -120,7 +120,7 @@ class StorageEngine { void set_store_used_flag(const std::string& root_path, bool is_used); // @brief 获取所有root_path信息 - OLAPStatus get_all_data_dir_info(std::vector* data_dir_infos); + OLAPStatus get_all_data_dir_info(std::vector* data_dir_infos, bool need_update); // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位, // 当检测到有unused标识时,从内存中删除对应表信息,磁盘数据不动。 @@ -201,10 +201,6 @@ class StorageEngine { bool _used_disk_not_enough(uint32_t unused_num, uint32_t total_num); - OLAPStatus _get_path_available_capacity( - const std::string& root_path, - int64_t* disk_available); - OLAPStatus _config_root_path_unused_flag_file( const std::string& root_path, std::string* unused_flag_file); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 6126919d59275f..d2076a4097165e 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -117,10 +117,17 @@ AgentStatus EngineBatchLoadTask::_init() { } // Empty remote_path - if (!_push_req.__isset.http_file_path) { + if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { _is_init = true; return status; } + + // check disk capacity + if (_push_req.push_type == TPushType::LOAD) { + if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) { + return DORIS_DISK_REACH_CAPACITY_LIMIT; + } + } // Check remote path _remote_file_path = _push_req.http_file_path; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index c70d27489f6456..13300f5b318207 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -468,6 +468,12 @@ AgentStatus EngineCloneTask::_clone_copy( break; } + // check disk capacity + if (data_dir.reach_capacity_limit(file_size)) { + status = DORIS_DISK_REACH_CAPACITY_LIMIT; + break; + } + total_file_size += file_size; uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; if (estimate_timeout < config::download_low_speed_time) { diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index f65a5281fc8a01..0c2faccf833f8c 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -118,6 +118,14 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( break; } + // check disk capacity + int64_t tablet_size = tablet->tablet_footprint(); + if (stores[0]->reach_capacity_limit(tablet_size)) { + res = OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + break; + } + + // get shard uint64_t shard = 0; res = stores[0]->get_shard(&shard); if (res != OLAP_SUCCESS) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 08e9c8bc19a8aa..4a4f64f9348b9f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -500,7 +500,7 @@ void FragmentMgr::cancel_worker() { } } for (auto& id : to_delete) { - LOG(INFO) << "FragmentMgr cancel worker going to cancel fragment " << id; + LOG(INFO) << "FragmentMgr cancel worker going to cancel fragment " << print_id(id); cancel(id); } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index f3b48822648195..daf947e08b7842 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -318,6 +318,15 @@ Status SnapshotLoader::download( return Status::InternalError(ss.str()); } + TabletSharedPtr tablet = _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id, schema_hash); + if (tablet == nullptr) { + std::stringstream ss; + ss << "failed to get local tablet: " << local_tablet_id; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + DataDir* data_dir = tablet->data_dir(); + for (auto& iter : remote_files) { RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, @@ -367,6 +376,12 @@ Status SnapshotLoader::download( LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; size_t file_len = file_stat.size; + + // check disk capacity + if (data_dir->reach_capacity_limit(file_len)) { + return Status::InternalError("capacity limit reached"); + } + { // 1. open remote file for read std::unique_ptr broker_reader; diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 1d1c282a57ffeb..8b9bf6487fdcb8 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -155,9 +155,9 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { if (st != OLAP_SUCCESS) { std::stringstream ss; ss << "tablet writer write failed, tablet_id=" << it->first - << ", transaction_id=" << _txn_id << ", status=" << st; + << ", transaction_id=" << _txn_id << ", err=" << st; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str() + ", be: " + BackendOptions::get_localhost()); + return Status::InternalError(ss.str()); } } _next_seqs[params.sender_id()]++; @@ -187,9 +187,11 @@ Status TabletsChannel::close(int sender_id, bool* finished, if (_partition_ids.count(it.second->partition_id()) > 0) { auto st = it.second->close(tablet_vec); if (st != OLAP_SUCCESS) { - LOG(WARNING) << "close tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id; - _close_status = Status::InternalError("close tablet writer failed"); + std::stringstream ss; + ss << "close tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id << ", err=" << st; + LOG(WARNING) << ss.str(); + _close_status = Status::InternalError(ss.str()); return _close_status; } } else { @@ -233,11 +235,13 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) DeltaWriter* writer = nullptr; auto st = DeltaWriter::open(&request, &writer); if (st != OLAP_SUCCESS) { - LOG(WARNING) << "open delta writer failed, tablet_id=" << tablet.tablet_id() + std::stringstream ss; + ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() << ", txn_id=" << _txn_id << ", partition_id=" << tablet.partition_id() - << ", status=" << st; - return Status::InternalError("open tablet writer failed"); + << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); } _tablet_writers.emplace(tablet.tablet_id(), writer); } diff --git a/docs/documentation/cn/administrator-guide/operation/disk-capacity.md b/docs/documentation/cn/administrator-guide/operation/disk-capacity.md new file mode 100644 index 00000000000000..502305fd71f5ce --- /dev/null +++ b/docs/documentation/cn/administrator-guide/operation/disk-capacity.md @@ -0,0 +1,122 @@ +# 磁盘空间管理 + +本文档主要介绍和磁盘存储空间有关的系统参数和处理策略。 + +Doris 的数据磁盘空间如果不加以控制,会因磁盘写满而导致进程挂掉。因此我们监测磁盘的使用率和剩余空间,通过设置不同的警戒水位,来控制 Doris 系统中的各项操作,尽量避免发生磁盘被写满的情况。 + +## 名词解释 + +* FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。 +* BE:Backend,Doris 的后端节点。负责查询执行和数据存储。 +* Data Dir:数据目录,在 BE 配置文件 `be.conf` 的 `storage_root_path` 中指定的各个数据目录。通常一个数据目录对应一个磁盘、因此下文中 **磁盘** 也指代一个数据目录。 + +## 基本原理 + +BE 定期(每隔一分钟)会向 FE 汇报一次磁盘使用情况。FE 记录这些统计值,并根据这些统计值,限制不同的操作请求。 + +在 FE 中分别设置了 **高水位(High Watermark)** 和 **危险水位(Flood Stage)** 两级阈值。危险水位高于高水位。当磁盘使用率高于高水位时,Doris 会限制某些操作的执行(如副本均衡等)。而如果高于危险水位,则会禁止某些操作的执行(如导入)。 + +同时,在 BE 上也设置了 **危险水位(Flood Stage)**。考虑到 FE 并不能完全及时的检测到 BE 上的磁盘使用情况,以及无法控制某些 BE 自身运行的操作(如 Compaction)。因此 BE 上的危险水位用于 BE 主动拒绝和停止某些操作,达到自我保护的目的。 + +## FE 参数 + +**高水位:** + +``` +storage_high_watermark_usage_percent 默认 85 (85%)。 +storage_min_left_capacity_bytes 默认 2GB。 +``` + +当磁盘空间使用率**大于** `storage_high_watermark_usage_percent`,**或者** 磁盘空间剩余大小**小于** `storage_min_left_capacity_bytes` 时,该磁盘不会再被作为以下操作的目的路径: + +* Tablet 均衡操作(Balance) +* Colocation 表数据分片的重分布(Relocation) +* Decommission + +**危险水位:** + +``` +storage_flood_stage_usage_percent 默认 95 (95%)。 +storage_flood_stage_left_capacity_bytes 默认 1GB。 +``` + +当磁盘空间使用率**大于** `storage_flood_stage_usage_percent`,**并且** 磁盘空间剩余大小**小于** `storage_flood_stage_left_capacity_bytes` 时,该磁盘不会再被作为以下操作的目的路径,并禁止某些操作: + +* Tablet 均衡操作(Balance) +* Colocation 表数据分片的重分布(Relocation) +* 副本补齐 +* 恢复操作(Restore) +* 数据导入(Load/Insert) + +## BE 参数 + +**危险水位:** + +``` +capacity_used_percent_flood_stage 默认 95 (95%)。 +capacity_min_left_bytes_flood_stage 默认 1GB。 +``` + +当磁盘空间使用率**大于** `storage_flood_stage_usage_percent`,**并且** 磁盘空间剩余大小**小于** `storage_flood_stage_left_capacity_bytes` 时,该磁盘上的以下操作会被禁止: + +* Base/Cumulative Compaction。 +* 数据写入。包括各种导入操作。 +* Clone Task。通常发生于副本修复或均衡时。 +* Push Task。发生在 Hadoop 导入的 Loading 阶段,下载文件。 +* Alter Task。Schema Change 或 Rollup 任务。 +* Download Task。恢复操作的 Downloading 阶段。 + +## 磁盘空间释放 + +当磁盘空间高于高水位甚至危险水位后,很多操作都会被禁止。此时可以尝试通过以下方式减少磁盘使用率,恢复系统。 + +* 删除表或分区 + + 通过删除表或分区的方式,能够快速降低磁盘空间使用率,恢复集群。**注意:只有 `DROP` 操作可以达到快速降低磁盘空间使用率的目的,`DELETE` 操作不可以。** + + ``` + DROP TABLE tbl; + ALTER TABLE tbl DROP PARTITION p1; + ``` + +* 扩容 BE + + 扩容后,数据分片会自动均衡到磁盘使用率较低的 BE 节点上。扩容操作会根据数据量及节点数量不同,在数小时或数天后使集群到达均衡状态。 + +* 修改表或分区的副本 + + 可以将表或分区的副本数降低。比如默认3副本可以降低为2副本。该方法虽然降低了数据的可靠性,但是能够快速的降低磁盘使用率,使集群恢复正常。该方法通常用于紧急恢复系统。请在恢复后,通过扩容或删除数据等方式,降低磁盘使用率后,将副本数恢复为 3。 + + 修改副本操作为瞬间生效,后台会自动异步的删除多余的副本。 + + ``` + ALTER TABLE tbl MODIFY PARTITION p1 SET("replication_num" = "2"); + ``` + +* 删除多余文件 + + 当 BE 进程已经因为磁盘写满而挂掉并无法启动时(此现象可能因 FE 或 BE 检测不及时而发生)。需要通过删除数据目录下的一些临时文件,保证 BE 进程能够启动。以下目录中的文件可以直接删除: + + * log/:日志目录下的日志文件。 + * snapshot/: 快照目录下的快照文件。 + * trash/:回收站中的文件。 + +* 删除数据文件(危险!!!) + + 当以上操作都无法释放空间时,需要通过删除数据文件来释放空间。数据文件在指定数据目录的 `data/` 目录下。删除数据分片(Tablet)必须先确保该 Tablet 至少有一个副本是正常的,否则**删除唯一副本会导致数据丢失**。假设我们要删除 id 为 12345 的 Tablet: + + * 找到 Tablet 对应的目录,通常位于 `data/shard_id/tablet_id/` 下。如: + + ```data/0/12345/``` + + * 记录 tablet id 和 schema hash。其中 schema hash 为上一步目录的下一级目录名。如下为 352781111: + + ```data/0/12345/352781111``` + + * 删除数据目录: + + ```rm -rf data/0/12345/``` + + * 删除 Tablet 元数据(具体参考 [Tablet 元数据管理工具](./tablet-meta-tool.md)) + + ```./lib/meta_tool --operation=delete_header --root_path=/path/to/root_path --tablet_id=12345 --schema_hash= 352781111``` \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java index e951f3439a4424..470488e628a79b 100644 --- a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -268,7 +268,7 @@ public static Map> checkDecommission(List totalAvailableCapacityB * Config.storage_high_watermark_usage_percent) { + if (totalNeededCapacityB > totalAvailableCapacityB * (Config.storage_high_watermark_usage_percent / 100.0)) { throw new DdlException("No available capacity for decommission in cluster: " + clusterName + ", needed: " + totalNeededCapacityB + ", available: " + totalAvailableCapacityB + ", threshold: " + Config.storage_high_watermark_usage_percent); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java index fc6365dc6995e2..2c894a42886292 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java @@ -54,5 +54,14 @@ public ShowResultSetMetaData getMetaData() { } return builder.build(); } + + @Override + public RedirectStatus getRedirectStatus() { + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } + } } diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 83642a2c88f93c..4f78e53ff9f4c7 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -755,6 +755,7 @@ private void checkAndPrepareMeta() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); + Map pathBeMap = Maps.newHashMap(); batchTask = new AgentBatchTask(); db.readLock(); try { @@ -773,10 +774,18 @@ private void checkAndPrepareMeta() { true /* is restore task*/); batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); + pathBeMap.put(replica.getPathHash(), replica.getBackendId()); } } finally { db.readUnlock(); } + + // check disk capacity + org.apache.doris.common.Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(pathBeMap, true); + if (!st.ok()) { + status = new Status(ErrCode.COMMON_ERROR, st.getErrorMsg()); + return; + } // send tasks for (AgentTask task : batchTask.getAllTasks()) { diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index c4f4162db651f1..8470c0cea5b908 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -17,16 +17,22 @@ package org.apache.doris.catalog; +import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.thrift.TStorageMedium; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class DiskInfo implements Writable { + private static final Logger LOG = LogManager.getLogger(DiskInfo.class); + public enum DiskState { ONLINE, OFFLINE @@ -123,6 +129,23 @@ public void setStorageMedium(TStorageMedium storageMedium) { this.storageMedium = storageMedium; } + /* + * Check if this disk's capacity reach the limit. Return true if yes. + * if floodStage is true, use floodStage threshold to check. + * floodStage threshold means a loosely limit, and we use 'AND' to give a more loosely limit. + */ + public boolean exceedLimit(boolean floodStage) { + LOG.debug("flood stage: {}, diskAvailableCapacityB: {}, totalCapacityB: {}", + floodStage, diskAvailableCapacityB, totalCapacityB); + if (floodStage) { + return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes && + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_flood_stage_usage_percent / 100.0); + } else { + return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes || + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_high_watermark_usage_percent / 100.0); + } + } + @Override public String toString() { return "DiskInfo [rootPath=" + rootPath + "(" + pathHash + "), totalCapacityB=" + totalCapacityB diff --git a/fe/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/src/main/java/org/apache/doris/catalog/Tablet.java index f3d73f83a011d7..fa4fa8c497c6ee 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/src/main/java/org/apache/doris/catalog/Tablet.java @@ -27,6 +27,7 @@ import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -39,6 +40,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -184,6 +186,24 @@ public List getNormalReplicaBackendIds() { return beIds; } + // return map of (path hash -> BE id) of normal replicas + public Map getNormalReplicaBackendPathMap() { + Map map = Maps.newHashMap(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + for (Replica replica : replicas) { + if (replica.isBad()) { + continue; + } + + ReplicaState state = replica.getState(); + if (infoService.checkBackendAlive(replica.getBackendId()) + && (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE)) { + map.put(replica.getPathHash(), replica.getBackendId()); + } + } + return map; + } + // for query public void getQueryableReplicas(List allQuerableReplica, List localReplicas, long visibleVersion, long visibleVersionHash, long localBeId, int schemaHash) { diff --git a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index 2c54d9b05f7602..2b5fee92cca8cd 100644 --- a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -24,9 +24,6 @@ import org.apache.doris.thrift.TStorageMedium; public class RootPathLoadStatistic implements Comparable { - // Even if for tablet recovery, we can not exceed these 2 limitations. - public static final double MAX_USAGE_PERCENT_LIMIT = 0.95; - public static final double MIN_LEFT_CAPACITY_BYTES_LIMIT = 100 * 1024 * 1024; // 100MB private long beId; private String path; @@ -96,8 +93,8 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } if (isSupplement) { - if ((usedCapacityB + tabletSize) / (double) capacityB > MAX_USAGE_PERCENT_LIMIT - && capacityB - usedCapacityB - tabletSize < MIN_LEFT_CAPACITY_BYTES_LIMIT) { + if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_flood_stage_usage_percent / 100.0) + && capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached"); } else { @@ -105,7 +102,7 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } } - if ((usedCapacityB + tabletSize) / (double) capacityB > Config.storage_high_watermark_usage_percent + if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_high_watermark_usage_percent / 100.0) || capacityB - usedCapacityB - tabletSize < Config.storage_min_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 39858e2a3eecf0..5e02f988275b69 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -662,15 +662,26 @@ public class Config extends ConfigBase { public static int backup_job_default_timeout_ms = 86400 * 1000; // 1 day /* - * storage_high_watermark_usage_percent limit the max capacity usage percent of a Backend storage path. - * storage_min_left_capacity_bytes limit the minimum left capacity of a Backend storage path. + * 'storage_high_watermark_usage_percent' limit the max capacity usage percent of a Backend storage path. + * 'storage_min_left_capacity_bytes' limit the minimum left capacity of a Backend storage path. * If both limitations are reached, this storage path can not be chose as tablet balance destination. * But for tablet recovery, we may exceed these limit for keeping data integrity as much as possible. */ @ConfField(mutable = true, masterOnly = true) - public static double storage_high_watermark_usage_percent = 0.85; + public static int storage_high_watermark_usage_percent = 85; @ConfField(mutable = true, masterOnly = true) - public static double storage_min_left_capacity_bytes = 1000 * 1024 * 1024; // 1G + public static long storage_min_left_capacity_bytes = 2 * 1024 * 1024 * 1024; // 2G + + /* + * If capacity of disk reach the 'storage_flood_stage_usage_percent' and 'storage_flood_stage_left_capacity_bytes', + * the following operation will be rejected: + * 1. load job + * 2. restore job + */ + @ConfField(mutable = true, masterOnly = true) + public static int storage_flood_stage_usage_percent = 95; + @ConfField(mutable = true, masterOnly = true) + public static long storage_flood_stage_left_capacity_bytes = 1 * 1024 * 1024 * 1024; // 100MB // update interval of tablet stat // All frontends will get tablet stat from all backends at each interval diff --git a/fe/src/main/java/org/apache/doris/common/Status.java b/fe/src/main/java/org/apache/doris/common/Status.java index 13107b3233c63c..93b655e7012f1e 100644 --- a/fe/src/main/java/org/apache/doris/common/Status.java +++ b/fe/src/main/java/org/apache/doris/common/Status.java @@ -22,10 +22,9 @@ import org.apache.doris.thrift.TStatusCode; public class Status { - public static final Status OK = new Status(); - public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); - public static final Status THRIFT_RPC_ERROR = - new Status(TStatusCode.THRIFT_RPC_ERROR, "Thrift RPC failed"); + public static final Status OK = new Status(); + public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); + public static final Status THRIFT_RPC_ERROR = new Status(TStatusCode.THRIFT_RPC_ERROR, "Thrift RPC failed"); public TStatusCode getErrorCode() { return errorCode; diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index 3da5a9ac74dbe0..a94b4e22485c82 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -143,7 +143,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) throws TException { switch (taskType) { case CREATE: Preconditions.checkState(request.isSetReport_version()); - finishCreateReplica(task, request.getReport_version()); + finishCreateReplica(task, request); break; case PUSH: checkHasTabletInfo(request); @@ -224,20 +224,27 @@ private void checkHasTabletInfo(TFinishTaskRequest request) throws Exception { } } - private void finishCreateReplica(AgentTask task, long reportVersion) { + private void finishCreateReplica(AgentTask task, TFinishTaskRequest request) { // if we get here, this task will be removed from AgentTaskQueue for certain. // because in this function, the only problem that cause failure is meta missing. // and if meta is missing, we no longer need to resend this task CreateReplicaTask createReplicaTask = (CreateReplicaTask) task; long tabletId = createReplicaTask.getTabletId(); + + if (request.isSetFinish_tablet_infos()) { + Replica replica = Catalog.getCurrentInvertedIndex().getReplica(createReplicaTask.getTabletId(), + createReplicaTask.getBackendId()); + replica.setPathHash(request.getFinish_tablet_infos().get(0).getPath_hash()); + } // this should be called before 'countDownLatch()' - Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), reportVersion, task.getDbId()); + Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), request.getReport_version(), + task.getDbId()); createReplicaTask.countDownLatch(task.getBackendId(), task.getSignature()); LOG.debug("finish create replica. tablet id: {}, be: {}, report version: {}", - tabletId, task.getBackendId(), reportVersion); + tabletId, task.getBackendId(), request.getReport_version()); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CREATE, task.getSignature()); } diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index f69754afe6c9e5..465da2ac62412b 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -31,8 +31,10 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -54,6 +56,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -292,20 +295,29 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr private TOlapTableLocationParam createLocation(OlapTable table) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); + Map allPathBeMap = Maps.newHashMap(); for (Partition partition : table.getPartitions()) { int quorum = table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices()) { // we should ensure the replica backend is alive // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { - List beIds = tablet.getNormalReplicaBackendIds(); - if (beIds.size() < quorum) { - throw new UserException("tablet " + tablet.getId() + " has few replicas: " + beIds.size()); + Map pathBeMap = tablet.getNormalReplicaBackendPathMap(); + if (pathBeMap.size() < quorum) { + throw new UserException("tablet " + tablet.getId() + " has few replicas: " + pathBeMap.size()); } - locationParam.addToTablets(new TTabletLocation(tablet.getId(), beIds)); + locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(pathBeMap.values()))); + allPathBeMap.putAll(pathBeMap); } } } + + // check if disk capacity reach limit + // this is for load process, so use high water mark to check + Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allPathBeMap, true); + if (!st.ok()) { + throw new DdlException(st.getErrorMsg()); + } return locationParam; } diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java index 63c46f7132c904..961a5cb488aea5 100644 --- a/fe/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/src/main/java/org/apache/doris/system/Backend.java @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TDisk; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -36,11 +37,13 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * This class extends the primary identifier of a Backend with ephemeral state, @@ -80,6 +83,10 @@ public enum BackendState { private AtomicReference> disksRef; private String heartbeatErrMsg = ""; + + // This is used for the first time we init pathHashToDishInfo in SystemInfoService. + // after init it, this variable is set to true. + private boolean initPathInfo = false; public Backend() { this.host = ""; @@ -338,9 +345,26 @@ public String getPathByPathHash(long pathHash) { } public void updateDisks(Map backendDisks) { - // update status or add new diskInfo + ImmutableMap disks = disksRef.get(); - Map newDisks = Maps.newHashMap(); + // The very first time to init the path info + if (!initPathInfo) { + boolean allPathHashUpdated = true; + for (DiskInfo diskInfo : disks.values()) { + if (diskInfo.getPathHash() == 0) { + allPathHashUpdated = false; + } + } + if (allPathHashUpdated) { + initPathInfo = true; + Catalog.getCurrentSystemInfo().updatePathInfo(disks.values().stream().collect(Collectors.toList()), Lists.newArrayList()); + } + } + + // update status or add new diskInfo + Map newDiskInfos = Maps.newHashMap(); + List addedDisks = Lists.newArrayList(); + List removedDisks = Lists.newArrayList(); /* * set isChanged to true only if new disk is added or old disk is dropped. * we ignore the change of capacity, because capacity info is only used in master FE. @@ -356,10 +380,11 @@ public void updateDisks(Map backendDisks) { DiskInfo diskInfo = disks.get(rootPath); if (diskInfo == null) { diskInfo = new DiskInfo(rootPath); + addedDisks.add(diskInfo); isChanged = true; LOG.info("add new disk info. backendId: {}, rootPath: {}", id, rootPath); } - newDisks.put(rootPath, diskInfo); + newDiskInfos.put(rootPath, diskInfo); diskInfo.setTotalCapacityB(totalCapacityB); diskInfo.setDataUsedCapacityB(dataUsedCapacityB); @@ -388,6 +413,7 @@ public void updateDisks(Map backendDisks) { for (DiskInfo diskInfo : disks.values()) { String rootPath = diskInfo.getRootPath(); if (!backendDisks.containsKey(rootPath)) { + removedDisks.add(diskInfo); isChanged = true; LOG.warn("remove not exist rootPath. backendId: {}, rootPath: {}", id, rootPath); } @@ -395,10 +421,13 @@ public void updateDisks(Map backendDisks) { if (isChanged) { // update disksRef - disksRef.set(ImmutableMap.copyOf(newDisks)); + disksRef.set(ImmutableMap.copyOf(newDiskInfos)); + Catalog.getCurrentSystemInfo().updatePathInfo(addedDisks, removedDisks); // log disk changing Catalog.getInstance().getEditLog().logBackendStateChange(this); } + + } public static Backend read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index 81688bc86a50f4..2202bb76d12214 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -19,13 +19,16 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend.BackendState; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -73,6 +76,8 @@ public class SystemInfoService { private long lastBackendIdForCreation = -1; private long lastBackendIdForOther = -1; + private AtomicReference> pathHashToDishInfoRef; + // sort host backends list by num of backends, descending private static final Comparator> hostBackendsListComparator = new Comparator> (){ @Override @@ -92,6 +97,7 @@ public SystemInfoService() { lastBackendIdForCreationMap = new ConcurrentHashMap(); lastBackendIdForOtherMap = new ConcurrentHashMap(); + pathHashToDishInfoRef = new AtomicReference>(ImmutableMap.of()); } // for deploy manager @@ -1107,5 +1113,40 @@ public Set getClusterNames() { } return clusterNames; } + + /* + * Check if the specified disks' capacity has reached the limit. + * pathBeMap is (path hash -> BE id) + * If floodStage is true, it will check with the floodStage threshold. + * + * return Status.OK if not reach the limit + */ + public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean floodStage) { + LOG.debug("pathBeMap: {}", pathBeMap); + ImmutableMap pathHashToDiskInfo = pathHashToDishInfoRef.get(); + for (Long pathHash : pathBeMap.keySet()) { + DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); + if (diskInfo != null && diskInfo.exceedLimit(floodStage)) { + return new Status(TStatusCode.CANCELLED, + "disk " + pathHash + " on backend " + pathBeMap.get(pathHash) + " exceed limit usage"); + } + } + return Status.OK; + } + + // update the path info when disk report + // there is only one thread can update path info, so no need to worry about concurrency control + public void updatePathInfo(List addedDisks, List removedDisks) { + Map copiedPathInfos = Maps.newHashMap(pathHashToDishInfoRef.get()); + for (DiskInfo diskInfo : addedDisks) { + copiedPathInfos.put(diskInfo.getPathHash(), diskInfo); + } + for (DiskInfo diskInfo : removedDisks) { + copiedPathInfos.remove(diskInfo.getPathHash()); + } + ImmutableMap newPathInfos = ImmutableMap.copyOf(copiedPathInfos); + pathHashToDishInfoRef.set(newPathInfos); + LOG.debug("update path infos: {}", newPathInfos); + } } diff --git a/fe/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/src/test/java/org/apache/doris/catalog/BackendTest.java index 131a1bdbc33261..48f323dc06ff4c 100644 --- a/fe/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TDisk; import org.easymock.EasyMock; @@ -57,12 +58,17 @@ public class BackendTest { private Catalog catalog; + private SystemInfoService systemInfoService; + @Before public void setUp() { + systemInfoService = new SystemInfoService(); + catalog = AccessTestUtil.fetchAdminCatalog(); PowerMock.mockStatic(Catalog.class); EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes(); EasyMock.expect(Catalog.getCurrentCatalogJournalVersion()).andReturn(FeConstants.meta_version).anyTimes(); + EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes(); PowerMock.replay(Catalog.class); backend = new Backend(backendId, host, heartbeatPort); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index bd0ae6232c318c..c308e2f72d8ffc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -113,7 +113,6 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); - new Expectations() { { loadStmt.getLabel(); diff --git a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index e6e3a3b1d0383b..891c78de6c2490 100644 --- a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -33,7 +33,6 @@ import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.UserException; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TUniqueId; @@ -42,20 +41,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.junit.Before; import org.junit.Test; import mockit.Expectations; import mockit.Injectable; -import mockit.Mocked; public class OlapTableSinkTest { private static final Logger LOG = LogManager.getLogger(OlapTableSinkTest.class); @Injectable - OlapTable dstTable; + public OlapTable dstTable; - @Mocked - SystemInfoService systemInfoService; + @Before + public void setUp() { + + } private TupleDescriptor getTuple() { DescriptorTable descTable = new DescriptorTable();