Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,25 @@ static Status vault_process_error(std::string_view id,
}

struct VaultCreateFSVisitor {
VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format)
: id(id), path_format(path_format) {}
VaultCreateFSVisitor(const std::string& id, const cloud::StorageVaultPB_PathFormat& path_format,
bool check_fs)
: id(id), path_format(path_format), check_fs(check_fs) {}
Status operator()(const S3Conf& s3_conf) const {
LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id;
LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id
<< " check_fs: " << check_fs;

auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, id));
if (check_fs && !s3_conf.client_conf.role_arn.empty()) {
bool res = false;
// just check connectivity, not care object if exist
auto st = fs->exists("not_exist_object", &res);
if (!st.ok()) {
LOG(FATAL) << "failed to check s3 fs, resource_id: " << id << " st: " << st
<< "s3_conf: " << s3_conf.to_string()
<< "add enable_check_storage_vault=false to be.conf to skip the check";
}
}

put_storage_resource(id, {std::move(fs), path_format}, 0);
LOG_INFO("successfully create s3 vault, vault id {}", id);
return Status::OK();
Expand All @@ -142,6 +155,7 @@ struct VaultCreateFSVisitor {

const std::string& id;
const cloud::StorageVaultPB_PathFormat& path_format;
bool check_fs;
};

struct RefreshFSVaultVisitor {
Expand Down Expand Up @@ -176,7 +190,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
sync_storage_vault();
sync_storage_vault(config::enable_check_storage_vault);

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down Expand Up @@ -322,13 +336,25 @@ Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sp
return Status::OK();
}

void CloudStorageEngine::sync_storage_vault() {
void CloudStorageEngine::sync_storage_vault(bool check_storage_vault) {
cloud::StorageVaultInfos vault_infos;
bool enable_storage_vault = false;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
auto st = Status::OK();
while (true) {
st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (st.ok()) {
break;
}

if (!check_storage_vault) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
}

LOG(WARNING) << "failed to get storage vault info from ms, err=" << st
<< " sleep 200ms retry or add enable_check_storage_vault=false to be.conf"
<< " to skip the check.";
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}

if (vault_infos.empty()) {
Expand All @@ -338,10 +364,12 @@ void CloudStorageEngine::sync_storage_vault() {

for (auto& [id, vault_info, path_format] : vault_infos) {
auto fs = get_filesystem(id);
auto status = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id, path_format}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
vault_info);
auto status =
(fs == nullptr)
? std::visit(VaultCreateFSVisitor {id, path_format, check_storage_vault},
vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs), path_format},
vault_info);
if (!status.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/schema_cloud_dictionary_cache.h"
#include "cloud_txn_delete_bitmap_cache.h"
#include "io/cache/block_file_cache_factory.h"
Expand Down Expand Up @@ -136,7 +137,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
std::string_view compaction_policy);

void sync_storage_vault();
void sync_storage_vault(bool check = false);

io::FileCacheBlockDownloader& file_cache_block_downloader() const {
return *_file_cache_block_downloader;
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");
DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");

DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");

DEFINE_Bool(enable_check_storage_vault, "true");

#include "common/compile_check_end.h"
} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,7 @@ DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);

DECLARE_mInt32(meta_service_conflict_error_retry_times);

DECLARE_Bool(enable_check_storage_vault);

#include "common/compile_check_end.h"
} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ Status S3FileSystem::exists_impl(const Path& path, bool* res) const {
CHECK_S3_CLIENT(client);
auto key = DORIS_TRY(get_key(path));

VLOG_DEBUG << "key:" << key << " path:" << path;

auto resp = client->head_object({.bucket = _bucket, .key = key});

if (resp.resp.status.code == ErrorCode::OK) {
Expand Down
Loading