From da78866818ff5c007d556f189f87db95121ff405 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Thu, 18 Jul 2024 11:44:16 +0800 Subject: [PATCH] [enhancement](compaction) Control the parallelism for urgent compaction tasks (#37782) For some urgent compaction tasks, their submittion should take parallelism into account. Currently, we apply the control policy for data loading in specific. Other source of urgent tasks are considered as eager. --- be/src/agent/task_worker_pool.cpp | 3 ++- be/src/olap/olap_server.cpp | 41 ++++++++++++++++++++++++++----- be/src/olap/storage_engine.h | 2 +- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9d222114e0727..59971f01a4df24 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -1586,7 +1587,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) && published_count % 20 == 0) { auto st = _engine.submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, true); + tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id << ", published=" << published_count << " : " << st; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 038a5f2cd45dc1..173fc2278928f4 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num( return num; } -bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type, - bool all_base) { - if (count >= thread_per_disk) { +bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk, + CompactionType compaction_type, bool all_base) { + if (task_cnt_per_disk >= thread_per_disk) { // Return if no available slot return false; - } else if (count >= thread_per_disk - 1) { + } else if (task_cnt_per_disk >= thread_per_disk - 1) { // Only one slot left, check if it can be assigned to base compaction task. if (compaction_type == CompactionType::BASE_COMPACTION) { if (all_base) { @@ -912,7 +912,7 @@ std::vector StorageEngine::_generate_compaction_tasks( copied_cumu_map = _tablet_submitted_cumu_compaction; copied_base_map = _tablet_submitted_base_compaction; } - for (auto data_dir : data_dirs) { + for (auto* data_dir : data_dirs) { bool need_pick_tablet = true; // We need to reserve at least one Slot for cumulative compaction. // So when there is only one Slot, we have to judge whether there is a cumulative compaction @@ -1091,7 +1091,36 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force) { + bool force, bool eager) { + if (!eager) { + DCHECK(compaction_type == CompactionType::BASE_COMPACTION || + compaction_type == CompactionType::CUMULATIVE_COMPACTION); + std::map> copied_cumu_map; + std::map> copied_base_map; + { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + copied_cumu_map = _tablet_submitted_cumu_compaction; + copied_base_map = _tablet_submitted_base_compaction; + } + auto stores = get_stores(); + + auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type, + this](auto* data_dir) { + int count = _get_executing_compaction_num(copied_base_map[data_dir]) + + _get_executing_compaction_num(copied_cumu_map[data_dir]); + int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk + : config::compaction_task_num_per_disk; + bool all_base = copied_cumu_map[data_dir].empty(); + return need_generate_compaction_tasks(count, paral, compaction_type, all_base); + }; + + bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred); + if (is_busy) { + LOG_EVERY_N(WARNING, 100) + << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id(); + return Status::OK(); + } + } _update_cumulative_compaction_policy(); // alter table tableName set ("compaction_policy"="time_series") // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 99e92828a0bee6..f647869e82500c 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -195,7 +195,7 @@ class StorageEngine { void check_cumulative_compaction_config(); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force); + bool force, bool eager = true); Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments);