From 3d4430e36385e23c9662e96c17deacdfd585daf7 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 8 Apr 2025 20:04:35 +0800 Subject: [PATCH 1/3] 1 --- be/src/cloud/cloud_base_compaction.cpp | 38 ++--- be/src/cloud/cloud_base_compaction.h | 1 + be/src/cloud/cloud_cumulative_compaction.cpp | 73 ++++------ be/src/cloud/cloud_cumulative_compaction.h | 1 + be/src/cloud/cloud_full_compaction.cpp | 37 ++--- be/src/cloud/cloud_full_compaction.h | 1 + be/src/cloud/cloud_storage_engine.cpp | 121 ++++++++++++++-- be/src/cloud/cloud_storage_engine.h | 11 ++ .../test_cloud_compaction_global_lock.groovy | 135 ++++++++++++++++++ 9 files changed, 331 insertions(+), 87 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index ce105d89b3ffa2..8959029d341b94 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -64,6 +64,26 @@ Status CloudBaseCompaction::prepare_compact() { RETURN_IF_ERROR(pick_rowsets_to_compact()); + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size); + return Status::OK(); +} + +Status CloudBaseCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -113,25 +133,7 @@ Status CloudBaseCompaction::prepare_compact() { LOG(WARNING) << msg; return Status::InternalError(msg); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size); return st; } diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index b9f52922b8e29f..63bb5c61def9d1 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -32,6 +32,7 @@ class CloudBaseCompaction : public CloudCompactionMixin { Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 8376b905b1825b..b4376fe206e7c9 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -62,9 +62,6 @@ Status CloudCumulativeCompaction::prepare_compact() { } } - int tried = 0; -PREPARE_TRY_AGAIN: - bool need_sync_tablet = true; { std::shared_lock rlock(_tablet->get_header_lock()); @@ -83,7 +80,7 @@ Status CloudCumulativeCompaction::prepare_compact() { // pick rowsets to compact auto st = pick_rowsets_to_compact(); if (!st.ok()) { - if (tried == 0 && _last_delete_version.first != -1) { + if (_last_delete_version.first != -1) { // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. // plus 1 to skip the delete version. // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. @@ -96,6 +93,30 @@ Status CloudCumulativeCompaction::prepare_compact() { return st; } + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size) + .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) + .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) + .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); + return st; +} + +Status CloudCumulativeCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -121,7 +142,7 @@ Status CloudCumulativeCompaction::prepare_compact() { // Set input version range to let meta-service check version range conflict compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction); cloud::StartTabletJobResponse resp; - st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + Status st = _engine.meta_mgr().prepare_tablet_job(job, &resp); if (!st.ok()) { if (resp.status().code() == cloud::STALE_TABLET_CACHE) { // set last_sync_time to 0 to force sync tablet next time @@ -130,22 +151,10 @@ Status CloudCumulativeCompaction::prepare_compact() { // tablet not found cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_TABLET_BUSY) { - if (config::enable_parallel_cumu_compaction && resp.version_in_compaction_size() > 0 && - ++tried <= 2) { - _max_conflict_version = *std::max_element(resp.version_in_compaction().begin(), - resp.version_in_compaction().end()); - LOG_INFO("retry pick input rowsets") - .tag("job_id", _uuid) - .tag("max_conflict_version", _max_conflict_version) - .tag("tried", tried) - .tag("msg", resp.status().msg()); - goto PREPARE_TRY_AGAIN; - } else { - LOG_WARNING("failed to prepare cumu compaction") - .tag("job_id", _uuid) - .tag("msg", resp.status().msg()); - return Status::Error("no suitable versions"); - } + LOG_WARNING("failed to prepare cumu compaction") + .tag("job_id", _uuid) + .tag("msg", resp.status().msg()); + return Status::Error("no suitable versions"); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { (static_cast(_tablet.get()))->set_alter_version(resp.alter_version()); std::stringstream ss; @@ -159,29 +168,7 @@ Status CloudCumulativeCompaction::prepare_compact() { LOG(WARNING) << msg; return Status::InternalError(msg); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size) - .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) - .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) - .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) - .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); return st; } diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 1d445648a3dc1e..ba3b768a24ff10 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -34,6 +34,7 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 211ed5c2458330..0b6810a414c852 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -56,6 +56,25 @@ Status CloudFullCompaction::prepare_compact() { RETURN_IF_ERROR(pick_rowsets_to_compact()); + for (auto& rs : _input_rowsets) { + _input_row_num += rs->num_rows(); + _input_segments += rs->num_segments(); + _input_rowsets_data_size += rs->data_disk_size(); + _input_rowsets_index_size += rs->index_disk_size(); + _input_rowsets_total_size += rs->total_disk_size(); + } + LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), + _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) + .tag("job_id", _uuid) + .tag("input_rowsets", _input_rowsets.size()) + .tag("input_rows", _input_row_num) + .tag("input_segments", _input_segments) + .tag("input_rowsets_data_size", _input_rowsets_data_size) + .tag("input_rowsets_index_size", _input_rowsets_index_size) + .tag("input_rowsets_total_size", _input_rowsets_total_size); + return Status::OK(); +} +Status CloudFullCompaction::request_global_lock() { // prepare compaction job cloud::TabletJobInfoPB job; auto idx = job.mutable_idx(); @@ -88,25 +107,7 @@ Status CloudFullCompaction::prepare_compact() { // tablet not found cloud_tablet()->clear_cache(); } - return st; - } - - for (auto& rs : _input_rowsets) { - _input_row_num += rs->num_rows(); - _input_segments += rs->num_segments(); - _input_rowsets_data_size += rs->data_disk_size(); - _input_rowsets_index_size += rs->index_disk_size(); - _input_rowsets_total_size += rs->total_disk_size(); } - LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), - _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) - .tag("job_id", _uuid) - .tag("input_rowsets", _input_rowsets.size()) - .tag("input_rows", _input_row_num) - .tag("input_segments", _input_segments) - .tag("input_rowsets_data_size", _input_rowsets_data_size) - .tag("input_rowsets_index_size", _input_rowsets_index_size) - .tag("input_rowsets_total_size", _input_rowsets_total_size); return st; } diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index b44e1503800c5b..1cdf52472c0d60 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -33,6 +33,7 @@ class CloudFullCompaction : public CloudCompactionMixin { Status prepare_compact() override; Status execute_compact() override; + Status request_global_lock(); void do_lease(); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index fc35d1b31847fc..2730e2c696da4a 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -17,6 +17,7 @@ #include "cloud/cloud_storage_engine.h" +#include #include #include #include @@ -26,6 +27,7 @@ #include #include +#include #include #include "cloud/cloud_base_compaction.h" @@ -49,18 +51,25 @@ #include "io/fs/hdfs_file_system.h" #include "io/fs/s3_file_system.h" #include "io/hdfs_util.h" +#include "io/io_common.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/memtable_flush_executor.h" #include "olap/storage_policy.h" #include "runtime/memory/cache_manager.h" #include "util/parse_util.h" +#include "vec/common/assert_cast.h" namespace doris { #include "common/compile_check_begin.h" using namespace std::literals; +bvar::Adder g_base_compaction_running_task_count("base_compaction_running_task_count"); +bvar::Adder g_full_compaction_running_task_count("full_compaction_running_task_count"); +bvar::Adder g_cumu_compaction_running_task_count( + "cumulative_compaction_running_task_count"); + int get_cumu_thread_num() { if (config::max_cumu_compaction_threads > 0) { return config::max_cumu_compaction_threads; @@ -586,6 +595,61 @@ std::vector CloudStorageEngine::_generate_cloud_compaction_task return tablets_compaction; } +Status CloudStorageEngine::_request_tablet_global_compaction_lock( + ReaderType compaction_type, const CloudTabletSPtr& tablet, + std::shared_ptr compaction) { + long now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { + auto cumu_compaction = static_pointer_cast(compaction); + if (auto st = cumu_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request cumu compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_cumu_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_cumu_compactions[tablet->tablet_id()].push_back(cumu_compaction); + } + return Status::OK(); + } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { + auto base_compaction = static_pointer_cast(compaction); + if (auto st = base_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request base compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_base_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_base_compactions[tablet->tablet_id()] = base_compaction; + } + return Status::OK(); + } else if (compaction_type == ReaderType::READER_FULL_COMPACTION) { + auto full_compaction = static_pointer_cast(compaction); + if (auto st = full_compaction->request_global_lock(); !st.ok()) { + LOG_WARNING("failed to request full compactoin global lock") + .tag("tablet id", tablet->tablet_id()) + .tag("msg", st.to_string()); + tablet->set_last_full_compaction_failure_time(now); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _executing_full_compactions[tablet->tablet_id()] = full_compaction; + } + return Status::OK(); + } else { + LOG(WARNING) << "unsupport compaction task for tablet: " << tablet->tablet_id() + << ", compaction name: " << compaction->compaction_name(); + return Status::NotFound("Unsupport compaction type {}", compaction->compaction_name()); + } +} + Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { using namespace std::chrono; { @@ -601,7 +665,9 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t auto compaction = std::make_shared(*this, tablet); auto st = compaction->prepare_compact(); if (!st.ok()) { - long now = duration_cast(system_clock::now().time_since_epoch()).count(); + long now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); tablet->set_last_base_compaction_failure_time(now); std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); @@ -612,8 +678,13 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t _submitted_base_compactions[tablet->tablet_id()] = compaction; } st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + g_base_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - auto st = compaction->execute_compact(); + Defer defer {[&]() { g_base_compaction_running_task_count << -1; }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, + compaction); + if (!st.ok()) return; + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast(system_clock::now().time_since_epoch()).count(); @@ -621,6 +692,7 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t } std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); + _executing_base_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { std::lock_guard lock(_compaction_mtx); @@ -649,7 +721,9 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto compaction = std::make_shared(*this, tablet); auto st = compaction->prepare_compact(); if (!st.ok()) { - long now = duration_cast(system_clock::now().time_since_epoch()).count(); + long now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); if (st.is() && st.msg() != "_last_delete_version.first not equal to -1") { // Backoff strategy if no suitable version @@ -681,8 +755,27 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS tablet->last_cumu_no_suitable_version_ms = 0; } }; + auto erase_executing_cumu_compaction = [=, this]() { + std::lock_guard lock(_compaction_mtx); + auto it = _executing_cumu_compactions.find(tablet->tablet_id()); + DCHECK(it != _executing_cumu_compactions.end()); + auto& compactions = it->second; + auto it1 = std::find(compactions.begin(), compactions.end(), compaction); + DCHECK(it1 != compactions.end()); + compactions.erase(it1); + if (compactions.empty()) { // No compactions on this tablet, erase key + _executing_cumu_compactions.erase(it); + // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to + // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform + // cumu compaction on tablet which has suitable versions for cumu compaction. + tablet->last_cumu_no_suitable_version_ms = 0; + } + }; st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line", + { sleep(5); }) signal::tablet_id = tablet->tablet_id(); + g_cumu_compaction_running_task_count << 1; bool is_large_task = true; Defer defer {[&]() { DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", @@ -692,7 +785,11 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS if (!is_large_task) { _cumu_compaction_thread_pool_small_tasks_running--; } + g_cumu_compaction_running_task_count << -1; }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, + tablet, compaction); + if (!st.ok()) return; do { std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads++; @@ -715,6 +812,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS .count(); tablet->set_last_cumu_compaction_failure_time(now); erase_submitted_cumu_compaction(); + erase_executing_cumu_compaction(); // sleep 5s for this tablet tablet->last_cumu_no_suitable_version_ms = now; LOG_WARNING( @@ -734,13 +832,14 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS } } } while (false); - auto st = compaction->execute_compact(); + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast(system_clock::now().time_since_epoch()).count(); tablet->set_last_cumu_compaction_failure_time(now); } erase_submitted_cumu_compaction(); + erase_executing_cumu_compaction(); }); if (!st.ok()) { erase_submitted_cumu_compaction(); @@ -777,8 +876,13 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t _submitted_full_compactions[tablet->tablet_id()] = compaction; } st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + g_full_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - auto st = compaction->execute_compact(); + Defer defer {[&]() { g_full_compaction_running_task_count << -1; }}; + auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, + compaction); + if (!st.ok()) return; + st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` long now = duration_cast(system_clock::now().time_since_epoch()).count(); @@ -786,6 +890,7 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t } std::lock_guard lock(_compaction_mtx); _submitted_full_compactions.erase(tablet->tablet_id()); + _executing_full_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { std::lock_guard lock(_compaction_mtx); @@ -825,17 +930,17 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { std::vector> compation_stop_tokens; { std::lock_guard lock(_compaction_mtx); - for (auto& [_, base] : _submitted_base_compactions) { + for (auto& [_, base] : _executing_base_compactions) { if (base) { // `base` might be a nullptr placeholder base_compactions.push_back(base); } } - for (auto& [_, cumus] : _submitted_cumu_compactions) { + for (auto& [_, cumus] : _executing_cumu_compactions) { for (auto& cumu : cumus) { cumu_compactions.push_back(cumu); } } - for (auto& [_, full] : _submitted_full_compactions) { + for (auto& [_, full] : _executing_full_compactions) { if (full) { full_compactions.push_back(full); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 2b04036eaabd3f..83793da99449e3 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -27,6 +27,7 @@ #include "cloud/schema_cloud_dictionary_cache.h" #include "cloud_txn_delete_bitmap_cache.h" #include "io/cache/block_file_cache_factory.h" +#include "olap/compaction.h" #include "olap/storage_engine.h" #include "olap/storage_policy.h" #include "util/threadpool.h" @@ -164,6 +165,9 @@ class CloudStorageEngine final : public BaseStorageEngine { Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); + Status _request_tablet_global_compaction_lock(ReaderType compaction_type, + const CloudTabletSPtr& tablet, + std::shared_ptr compaction); void _lease_compaction_thread_callback(); void _check_tablet_delete_bitmap_score_callback(); @@ -203,6 +207,13 @@ class CloudStorageEngine final : public BaseStorageEngine { // tablet_id -> active compaction stop tokens std::unordered_map> _active_compaction_stop_tokens; + // tablet_id -> executing cumu compactions, guarded by `_compaction_mtx` + std::unordered_map>> + _executing_cumu_compactions; + // tablet_id -> executing base compactions, guarded by `_compaction_mtx` + std::unordered_map> _executing_base_compactions; + // tablet_id -> executing full compactions, guarded by `_compaction_mtx` + std::unordered_map> _executing_full_compactions; using CumuPolices = std::unordered_map>; diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy new file mode 100644 index 00000000000000..7070265a880eec --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_compaction_global_lock.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_cloud_compaction_global_lock', 'docker') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "cumulative_compaction_min_deltas=2" ] + options.beConfigs += [ "cumulative_compaction_max_deltas=3" ] + options.beNum = 3 + docker(options) { + + def cumuInjectName = 'CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line' + def injectBe = null + def cumuNormalName = 'CloudStorageEngine._submit_cumulative_compaction_task.sleep' + def normalBe = null + def backends = sql_return_maparray('show backends') + + injectBe = backends[0] + assertNotNull(injectBe) + normalBe = backends[1] + assertNotNull(normalBe) + + def test_cumu_compaction_global_lock = { + def tableName = "test_cumu_compaction_global_lock" + sql """ DROP TABLE IF EXISTS ${tableName} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (0,0)""" + sql """ INSERT INTO ${tableName} VALUES (1,0)""" + sql """ INSERT INTO ${tableName} VALUES (2,0)""" + sql """ INSERT INTO ${tableName} VALUES (3,0)""" + sql """ INSERT INTO ${tableName} VALUES (4,0)""" + + def array = sql_return_maparray("SHOW TABLETS FROM test_cumu_compaction_global_lock") + def originTabletId = array[0].TabletId + def noramlOriginTabletId = array[0].TabletId + + sql """ select * from ${tableName} order by k""" + + Thread.sleep(5000) + + // inject be cu compaction + logger.info("run inject be cumu compaction:" + originTabletId) + def (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + Thread.sleep(1000) + + // normal be cu compaction + logger.info("run normal be cumu compaction:" + noramlOriginTabletId) + (code, out, err) = be_run_cumulative_compaction(normalBe.Host, normalBe.HttpPort, noramlOriginTabletId) + logger.info("Run normal be cumu compaction: code=" + code + ", out=" + out + ", err=" + err) + + Thread.sleep(1000) + + // check rowsets + logger.info("run inject be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + Thread.sleep(10000) + + // check rowsets + logger.info("run normal be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(normalBe.Host, normalBe.HttpPort, noramlOriginTabletId) + logger.info("Run normal be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + // check rowsets + logger.info("run inject be cumu show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run inject be cumu show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[3-3]")) + assertTrue(out.contains("[4-4]")) + assertTrue(out.contains("[5-5]")) + assertTrue(out.contains("[6-6]")) + + } + + try { + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, cumuInjectName) + DebugPoint.enableDebugPoint(normalBe.Host, normalBe.HttpPort.toInteger(), NodeType.BE, cumuNormalName) + + test_cumu_compaction_global_lock() + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, cumuInjectName) + DebugPoint.disableDebugPoint(normalBe.Host, normalBe.HttpPort.toInteger(), NodeType.BE, cumuNormalName) + } + } + } +} From ab4d208fc2033ca172060bf854dba4288e6a7ee4 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 11:51:23 +0800 Subject: [PATCH 2/3] 2 --- be/src/cloud/cloud_storage_engine.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 2730e2c696da4a..1f2dca2735ca08 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -529,6 +529,19 @@ std::vector CloudStorageEngine::_generate_cloud_compaction_task [](int a, auto& b) { return a + b.second.size(); }); int num_base = cast_set(submitted_base_compactions.size() + submitted_full_compactions.size()); + std::string submitted_cumu_compaction_tablet_id; + for (const auto& cumu : submitted_cumu_compactions) { + submitted_cumu_compaction_tablet_id += std::to_string(cumu.first); + } + std::string submitted_base_compaction_tablet_id; + for (const auto& base : submitted_base_compactions) { + submitted_base_compaction_tablet_id += std::to_string(base.first); + } + LOG_WARNING("lyk_debug") + .tag("num_cumu", num_cumu) + .tag("cumu_tablet_id", submitted_cumu_compaction_tablet_id) + .tag("num_base", num_base) + .tag("base_tablet_id", submitted_base_compaction_tablet_id); int n = thread_per_disk - num_cumu - num_base; if (compaction_type == CompactionType::BASE_COMPACTION) { // We need to reserve at least one thread for cumulative compaction, From ad89e2cb7e6106b66cdfd98d55598fd78c5cf599 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 22 Apr 2025 21:19:26 +0800 Subject: [PATCH 3/3] 3 --- be/src/cloud/cloud_storage_engine.cpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 1f2dca2735ca08..a95debde4c6e1a 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -537,11 +537,6 @@ std::vector CloudStorageEngine::_generate_cloud_compaction_task for (const auto& base : submitted_base_compactions) { submitted_base_compaction_tablet_id += std::to_string(base.first); } - LOG_WARNING("lyk_debug") - .tag("num_cumu", num_cumu) - .tag("cumu_tablet_id", submitted_cumu_compaction_tablet_id) - .tag("num_base", num_base) - .tag("base_tablet_id", submitted_base_compaction_tablet_id); int n = thread_per_disk - num_cumu - num_base; if (compaction_type == CompactionType::BASE_COMPACTION) { // We need to reserve at least one thread for cumulative compaction, @@ -693,7 +688,10 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { g_base_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - Defer defer {[&]() { g_base_compaction_running_task_count << -1; }}; + Defer defer {[&]() { + g_base_compaction_running_task_count << -1; + _submitted_base_compactions.erase(tablet->tablet_id()); + }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, compaction); if (!st.ok()) return; @@ -704,7 +702,6 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t tablet->set_last_base_compaction_failure_time(now); } std::lock_guard lock(_compaction_mtx); - _submitted_base_compactions.erase(tablet->tablet_id()); _executing_base_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { @@ -799,6 +796,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS _cumu_compaction_thread_pool_small_tasks_running--; } g_cumu_compaction_running_task_count << -1; + erase_submitted_cumu_compaction(); }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, tablet, compaction); @@ -824,7 +822,6 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS long now = duration_cast(system_clock::now().time_since_epoch()) .count(); tablet->set_last_cumu_compaction_failure_time(now); - erase_submitted_cumu_compaction(); erase_executing_cumu_compaction(); // sleep 5s for this tablet tablet->last_cumu_no_suitable_version_ms = now; @@ -851,7 +848,6 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS long now = duration_cast(system_clock::now().time_since_epoch()).count(); tablet->set_last_cumu_compaction_failure_time(now); } - erase_submitted_cumu_compaction(); erase_executing_cumu_compaction(); }); if (!st.ok()) { @@ -891,7 +887,10 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { g_full_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); - Defer defer {[&]() { g_full_compaction_running_task_count << -1; }}; + Defer defer {[&]() { + g_full_compaction_running_task_count << -1; + _submitted_full_compactions.erase(tablet->tablet_id()); + }}; auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, compaction); if (!st.ok()) return; @@ -902,7 +901,6 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t tablet->set_last_full_compaction_failure_time(now); } std::lock_guard lock(_compaction_mtx); - _submitted_full_compactions.erase(tablet->tablet_id()); _executing_full_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) {