From f84b953ae181c7718f87bf372b13315c43e3ca36 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Thu, 22 May 2025 21:49:29 +0800 Subject: [PATCH 1/2] [feat](storage vault) Check storage vault connectivity for be when starting --- be/src/cloud/cloud_storage_engine.cpp | 52 ++++++++++++++++++++------- be/src/cloud/cloud_storage_engine.h | 5 ++- be/src/cloud/config.cpp | 3 ++ be/src/cloud/config.h | 2 ++ be/src/io/fs/s3_file_system.cpp | 2 ++ 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 8ae66ba69d5fb6..23f42fc846781b 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -119,12 +119,25 @@ static Status vault_process_error(std::string_view id, } struct VaultCreateFSVisitor { - VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format) - : id(id), path_format(path_format) {} + VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format, + bool check_fs) + : id(id), path_format(path_format), check_fs(check_fs) {} Status operator()(const S3Conf& s3_conf) const { - LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id; + LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id + << " check_fs: " << check_fs; auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id)); + if (check_fs && !s3_conf.client_conf.role_arn.empty()) { + bool res = false; + // just check connectivity, not care object if exist + auto st = fs->exists("not_exist_object", &res); + if (!st.ok()) { + LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st + << "s3_conf: " << s3_conf.to_string() + << "set enable_check_storage_vault=false to skip this check"; + } + } + put_storage_resource(id, {std::move(fs), path_format}, 0); LOG_INFO("successfully create s3 vault, vault id {}", id); return Status::OK(); @@ -142,6 +155,7 @@ struct VaultCreateFSVisitor { const std::string& id; const cloud::StorageVaultPB_PathFormat& path_format; + bool check_fs; }; struct RefreshFSVaultVisitor { @@ -176,7 +190,7 @@ struct RefreshFSVaultVisitor { }; Status CloudStorageEngine::open() { - sync_storage_vault(); + sync_storage_vault(_enable_check_storage_vault()); // TODO(plat1ko): DeleteBitmapTxnManager @@ -322,13 +336,23 @@ Status CloudStorageEngine::start_bg_threads(std::shared_ptr wg_sp return Status::OK(); } -void CloudStorageEngine::sync_storage_vault() { +void CloudStorageEngine::sync_storage_vault(bool check_storage_vault) { cloud::StorageVaultInfos vault_infos; bool enable_storage_vault = false; - auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); - if (!st.ok()) { - LOG(WARNING) << "failed to get storage vault info. err=" << st; - return; + auto st = Status::OK(); + while (true) { + st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault); + if (st.ok()) { + break; + } + + if (!check_storage_vault) { + LOG(WARNING) << "failed to get storage vault info. err=" << st; + return; + } + + LOG(WARNING) << "failed to get storage vault info. err=" << st << " sleep 5s and retry"; + std::this_thread::sleep_for(std::chrono::seconds(5)); } if (vault_infos.empty()) { @@ -338,10 +362,12 @@ void CloudStorageEngine::sync_storage_vault() { for (auto& [id, vault_info, path_format] : vault_infos) { auto fs = get_filesystem(id); - auto status = (fs == nullptr) - ? std::visit(VaultCreateFSVisitor {id, path_format}, vault_info) - : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format}, - vault_info); + auto status = + (fs == nullptr) + ? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault}, + vault_info) + : std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format}, + vault_info); if (!status.ok()) [[unlikely]] { LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index a3e977778eece1..c645ed55978ec6 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -24,6 +24,7 @@ //#include "cloud/cloud_full_compaction.h" #include "cloud/cloud_cumulative_compaction_policy.h" #include "cloud/cloud_tablet.h" +#include "cloud/config.h" #include "cloud/schema_cloud_dictionary_cache.h" #include "cloud_txn_delete_bitmap_cache.h" #include "io/cache/block_file_cache_factory.h" @@ -136,7 +137,7 @@ class CloudStorageEngine final : public BaseStorageEngine { std::shared_ptr cumu_compaction_policy( std::string_view compaction_policy); - void sync_storage_vault(); + void sync_storage_vault(bool check = false); io::FileCacheBlockDownloader& file_cache_block_downloader() const { return *_file_cache_block_downloader; @@ -171,6 +172,8 @@ class CloudStorageEngine final : public BaseStorageEngine { void _lease_compaction_thread_callback(); void _check_tablet_delete_bitmap_score_callback(); + bool _enable_check_storage_vault() { return config::enable_check_storage_vault; }; + std::atomic_bool _stopped {false}; std::unique_ptr _meta_mgr; diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index b0f80835598f1d..471de6464ae805 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -88,5 +88,8 @@ DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25"); DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000"); DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); + +DEFINE_Bool(enable_check_storage_vault, "true"); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index f7b85231cbdb86..a1da13765d69b6 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -125,5 +125,7 @@ DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms); DECLARE_mInt32(meta_service_conflict_error_retry_times); +DECLARE_Bool(enable_check_storage_vault); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 61960d9f4e6b3f..1ec5b0a83774cd 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -270,6 +270,8 @@ Status S3FileSystem::exists_impl(const Path& path, bool* res) const { CHECK_S3_CLIENT(client); auto key = DORIS_TRY(get_key(path)); + VLOG_DEBUG << "key:" << key << " path:" << path; + auto resp = client->head_object({.bucket = _bucket, .key = key}); if (resp.resp.status.code == ErrorCode::OK) { From 73e53c202390d94584ce4a0c3f85691d43702268 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Mon, 23 Jun 2025 10:05:05 +0800 Subject: [PATCH 2/2] Fix some comments --- be/src/cloud/cloud_storage_engine.cpp | 10 ++++++---- be/src/cloud/cloud_storage_engine.h | 2 -- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 23f42fc846781b..bd7a6208879889 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -134,7 +134,7 @@ struct VaultCreateFSVisitor { if (!st.ok()) { LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st << "s3_conf: " << s3_conf.to_string() - << "set enable_check_storage_vault=false to skip this check"; + << "add enable_check_storage_vault=false to be.conf to skip the check"; } } @@ -190,7 +190,7 @@ struct RefreshFSVaultVisitor { }; Status CloudStorageEngine::open() { - sync_storage_vault(_enable_check_storage_vault()); + sync_storage_vault(config::enable_check_storage_vault); // TODO(plat1ko): DeleteBitmapTxnManager @@ -351,8 +351,10 @@ void CloudStorageEngine::sync_storage_vault(bool check_storage_vault) { return; } - LOG(WARNING) << "failed to get storage vault info. err=" << st << " sleep 5s and retry"; - std::this_thread::sleep_for(std::chrono::seconds(5)); + LOG(WARNING) << "failed to get storage vault info from ms, err=" << st + << " sleep 200ms retry or add enable_check_storage_vault=false to be.conf" + << " to skip the check."; + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } if (vault_infos.empty()) { diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index c645ed55978ec6..f04d836c763c7d 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -172,8 +172,6 @@ class CloudStorageEngine final : public BaseStorageEngine { void _lease_compaction_thread_callback(); void _check_tablet_delete_bitmap_score_callback(); - bool _enable_check_storage_vault() { return config::enable_check_storage_vault; }; - std::atomic_bool _stopped {false}; std::unique_ptr _meta_mgr;