From fda89e2b729e40947165d7a2f46a719da11678a6 Mon Sep 17 00:00:00 2001 From: tsy Date: Thu, 26 Sep 2024 17:56:41 +0800 Subject: [PATCH] [chore](file-cache) Enable file cache for cloud mode by force --- be/src/cloud/cloud_rowset_writer.cpp | 3 +- be/src/io/cache/block_file_cache_factory.h | 5 +- be/src/runtime/exec_env_init.cpp | 82 ++++++++++++---------- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 7753bf7b65b7bf..5f878f59d5c64d 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -17,6 +17,7 @@ #include "cloud/cloud_rowset_writer.h" +#include "common/status.h" #include "io/cache/block_file_cache_factory.h" #include "io/fs/file_system.h" #include "olap/rowset/rowset_factory.h" @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) if (_context.is_local_rowset()) { // In cloud mode, this branch implies it is an intermediate rowset for external merge sort, // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`). - _context.tablet_path = io::FileCacheFactory::instance()->get_cache_path(); + _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path(); } else { _rowset_meta->set_remote_storage_resource(*_context.storage_resource); } diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index 6365fab31057ac..d7b710876ce8e3 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -21,7 +21,9 @@ #pragma once #include +#include #include +#include #include #include "common/status.h" @@ -46,7 +48,8 @@ class FileCacheFactory { size_t try_release(const std::string& base_path); - const std::string& get_cache_path() { + std::string_view pick_one_cache_path() { + DCHECK(!_caches.empty()); size_t cur_index = _next_index.fetch_add(1); return _caches[cur_index % _caches.size()]->get_base_path(); } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 758a2f3760c7a7..adb6b7fd101f27 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -270,7 +270,6 @@ Status ExecEnv::_init(const std::vector& store_paths, init_file_cache_factory(cache_paths); doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths, cache_paths); - _pipeline_tracer_ctx = std::make_unique(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); @@ -392,50 +391,57 @@ Status ExecEnv::init_pipeline_task_scheduler() { void ExecEnv::init_file_cache_factory(std::vector& cache_paths) { // Load file cache before starting up daemon threads to make sure StorageEngine is read. - if (doris::config::enable_file_cache) { - if (config::file_cache_each_block_size > config::s3_write_buffer_size || - config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { - LOG_FATAL( - "The config file_cache_each_block_size {} must less than or equal to config " - "s3_write_buffer_size {} and config::s3_write_buffer_size % " - "config::file_cache_each_block_size must be zero", - config::file_cache_each_block_size, config::s3_write_buffer_size); - exit(-1); - } - std::unordered_set cache_path_set; - Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); - if (!rest) { - LOG(FATAL) << "parse config file cache path failed, path=" - << doris::config::file_cache_path; + if (!config::enable_file_cache) { + if (config::is_cloud_mode()) { + LOG(FATAL) << "Cloud mode requires to enable file cache, plz set " + "config::enable_file_cache " + "= true"; exit(-1); } - std::vector file_cache_init_threads; + return; + } + if (config::file_cache_each_block_size > config::s3_write_buffer_size || + config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { + LOG_FATAL( + "The config file_cache_each_block_size {} must less than or equal to config " + "s3_write_buffer_size {} and config::s3_write_buffer_size % " + "config::file_cache_each_block_size must be zero", + config::file_cache_each_block_size, config::s3_write_buffer_size); + exit(-1); + } + std::unordered_set cache_path_set; + Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); + if (!rest) { + LOG(FATAL) << "parse config file cache path failed, path=" + << doris::config::file_cache_path; + exit(-1); + } + std::vector file_cache_init_threads; - std::list cache_status; - for (auto& cache_path : cache_paths) { - if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { - LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); - continue; - } + std::list cache_status; + for (auto& cache_path : cache_paths) { + if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { + LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); + continue; + } - file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() { - *status = doris::io::FileCacheFactory::instance()->create_file_cache( - cache_path.path, cache_path.init_settings()); - }); + file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() { + *status = doris::io::FileCacheFactory::instance()->create_file_cache( + cache_path.path, cache_path.init_settings()); + }); - cache_path_set.emplace(cache_path.path); - } + cache_path_set.emplace(cache_path.path); + } - for (std::thread& thread : file_cache_init_threads) { - if (thread.joinable()) { - thread.join(); - } + for (std::thread& thread : file_cache_init_threads) { + if (thread.joinable()) { + thread.join(); } - for (const auto& status : cache_status) { - if (!status.ok()) { - LOG(FATAL) << "failed to init file cache, err: " << status; - exit(-1); - } + } + for (const auto& status : cache_status) { + if (!status.ok()) { + LOG(FATAL) << "failed to init file cache, err: " << status; + exit(-1); } } }