diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9d222114e0727..59971f01a4df24 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -1586,7 +1587,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) && published_count % 20 == 0) { auto st = _engine.submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, true); + tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id << ", published=" << published_count << " : " << st; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 038a5f2cd45dc1..173fc2278928f4 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num( return num; } -bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type, - bool all_base) { - if (count >= thread_per_disk) { +bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk, + CompactionType compaction_type, bool all_base) { + if (task_cnt_per_disk >= thread_per_disk) { // Return if no available slot return false; - } else if (count >= thread_per_disk - 1) { + } else if (task_cnt_per_disk >= thread_per_disk - 1) { // Only one slot left, check if it can be assigned to base compaction task. if (compaction_type == CompactionType::BASE_COMPACTION) { if (all_base) { @@ -912,7 +912,7 @@ std::vector StorageEngine::_generate_compaction_tasks( copied_cumu_map = _tablet_submitted_cumu_compaction; copied_base_map = _tablet_submitted_base_compaction; } - for (auto data_dir : data_dirs) { + for (auto* data_dir : data_dirs) { bool need_pick_tablet = true; // We need to reserve at least one Slot for cumulative compaction. // So when there is only one Slot, we have to judge whether there is a cumulative compaction @@ -1091,7 +1091,36 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force) { + bool force, bool eager) { + if (!eager) { + DCHECK(compaction_type == CompactionType::BASE_COMPACTION || + compaction_type == CompactionType::CUMULATIVE_COMPACTION); + std::map> copied_cumu_map; + std::map> copied_base_map; + { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + copied_cumu_map = _tablet_submitted_cumu_compaction; + copied_base_map = _tablet_submitted_base_compaction; + } + auto stores = get_stores(); + + auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type, + this](auto* data_dir) { + int count = _get_executing_compaction_num(copied_base_map[data_dir]) + + _get_executing_compaction_num(copied_cumu_map[data_dir]); + int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk + : config::compaction_task_num_per_disk; + bool all_base = copied_cumu_map[data_dir].empty(); + return need_generate_compaction_tasks(count, paral, compaction_type, all_base); + }; + + bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred); + if (is_busy) { + LOG_EVERY_N(WARNING, 100) + << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id(); + return Status::OK(); + } + } _update_cumulative_compaction_policy(); // alter table tableName set ("compaction_policy"="time_series") // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 99e92828a0bee6..f647869e82500c 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -195,7 +195,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force); + bool force, bool eager = true); Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments);