From 55a36946f5663fe59bb3014cd54948c77c8ff4cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=8F=E5=88=9A?= Date: Tue, 6 Aug 2024 18:11:20 +0800 Subject: [PATCH 1/9] async load_cache_info_into_memory in LRUFileCache::initialize() --- .../io/cache/block/block_lru_file_cache.cpp | 95 ++++++++++--------- be/src/io/cache/block/block_lru_file_cache.h | 14 ++- 2 files changed, 65 insertions(+), 44 deletions(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 3406e827980578..ccb202fc79de7e 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -117,12 +117,18 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, } Status LRUFileCache::initialize() { - MonotonicStopWatch watch; - watch.start(); - std::lock_guard cache_lock(_mutex); + // MonotonicStopWatch watch; + // watch.start(); if (!_is_initialized) { if (fs::exists(_cache_base_path)) { - RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock)); + // the cache already exists, try to load cache info asyncly + _lazy_open_done = false; + RETURN_IF_ERROR(try_convert_cache_version()); + _cache_background_load_thread = std::thread([this]() { + load_cache_info_into_memory(); + _lazy_open_done = true; + LOG_INFO("FileCache {} lazy load done.", _cache_base_path); + }); } else { std::error_code ec; fs::create_directories(_cache_base_path, ec); @@ -135,18 +141,18 @@ Status LRUFileCache::initialize() { } _is_initialized = true; _cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this); - int64_t cost = watch.elapsed_time() / 1000 / 1000; - LOG(INFO) << fmt::format( - "After initialize file cache path={}, disposable queue size={} elements={}, index " - "queue size={} " - "elements={}, query queue " - "size={} elements={}, init cost(ms)={}", - _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), - _disposable_queue.get_elements_num(cache_lock), - _index_queue.get_total_cache_size(cache_lock), - _index_queue.get_elements_num(cache_lock), - _normal_queue.get_total_cache_size(cache_lock), - _normal_queue.get_elements_num(cache_lock), cost); + // int64_t cost = watch.elapsed_time() / 1000 / 1000; + // LOG(INFO) << fmt::format( + // "After initialize file cache path={}, disposable queue size={} elements={}, index " + // "queue size={} " + // "elements={}, query queue " + // "size={} elements={}, init cost(ms)={}", + // _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), + // _disposable_queue.get_elements_num(cache_lock), + // _index_queue.get_total_cache_size(cache_lock), + // _index_queue.get_elements_num(cache_lock), + // _normal_queue.get_total_cache_size(cache_lock), + // _normal_queue.get_elements_num(cache_lock), cost); return Status::OK(); } @@ -197,7 +203,19 @@ FileBlocks LRUFileCache::get_impl(const Key& key, const CacheContext& context, /// find list [segment1, ..., segmentN] of segments which intersect with given range. auto it = _files.find(key); if (it == _files.end()) { - return {}; + if (_lazy_open_done) { + return {}; + } + // FileCacheKey key; + // key.hash = hash; + // key.meta.type = context.cache_type; + // key.meta.expiration_time = context.expiration_time; + // _storage->load_blocks_directly_unlocked(this, key, cache_lock); + + it = _files.find(hash); + if (it == _files.end()) [[unlikely]] { + return {}; + } } const auto& file_blocks = it->second; @@ -786,7 +804,7 @@ void LRUFileCache::remove(FileBlockSPtr file_block, std::lock_guard& } } -Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& cache_lock) { +Status LRUFileCache::try_convert_cache_version() const { /// version 1.0: cache_base_path / key / offset /// version 2.0: cache_base_path / key_prefix / key / offset if (USE_CACHE_VERSION2 && read_file_cache_version() != "2.0") { @@ -799,34 +817,27 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::string key_prefix = fs::path(_cache_base_path) / cache_key.substr(0, KEY_PREFIX_LENGTH); if (!fs::exists(key_prefix)) { - std::error_code ec; - fs::create_directories(key_prefix, ec); - if (ec) { - LOG(WARNING) << "Failed to create new version cached directory: " - << ec.message(); - continue; - } - } - std::error_code ec; - std::filesystem::rename(key_it->path(), key_prefix / cache_key, ec); - if (ec) { - LOG(WARNING) - << "Failed to move old version cached directory: " << ec.message(); + RETURN_IF_ERROR(fs::create_directories(key_prefix)); } + RETURN_IF_ERROR( + std::filesystem::rename(key_it->path(), key_prefix / cache_key)); } } } if (!write_file_cache_version().ok()) { - LOG(WARNING) << "Failed to write version hints for file cache"; + Lreturn Status::InternalError("Failed to write version hints for file cache"); } } + return Status::OK(); +} +void LRUFileCache::load_cache_info_into_memory() { + std::lock_guard cache_lock(_mutex); Key key; uint64_t offset = 0; size_t size = 0; std::vector> queue_entries; std::vector need_to_check_if_empty_dir; - Status st = Status::OK(); auto scan_file_cache = [&](fs::directory_iterator& key_it) { for (; key_it != fs::directory_iterator(); ++key_it) { key = Key( @@ -850,8 +861,8 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::error_code ec; std::filesystem::remove(offset_it->path(), ec); if (ec) { - st = Status::IOError(ec.message()); - break; + LOG(WARNING) << "filesystem error, failed to remove file, file=" + << offset_it->path() << " error=" << ec.message(); } continue; } else { @@ -863,8 +874,8 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca } if (!parsed) { - st = Status::IOError("Unexpected file: {}", offset_it->path().native()); - break; + LOG(WARNING) << "parse offset err, path=" << offset_it->path().native(); + continue; } size = offset_it->file_size(); @@ -872,7 +883,8 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::error_code ec; fs::remove(offset_it->path(), ec); if (ec) { - LOG(WARNING) << ec.message(); + LOG(WARNING) << "filesystem error, failed to remove file, file=" + << offset_it->path() << " error=" << ec.message(); } continue; } @@ -884,7 +896,8 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::error_code ec; std::filesystem::remove(offset_it->path(), ec); if (ec) { - st = Status::IOError(ec.message()); + LOG(WARNING) << "filesystem error, failed to remove file, file=" + << offset_it->path() << " error=" << ec.message(); } need_to_check_if_empty_dir.push_back(key_it->path()); } @@ -912,9 +925,6 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca fs::directory_iterator key_it {_cache_base_path}; scan_file_cache(key_it); } - if (!st) { - return st; - } std::for_each(need_to_check_if_empty_dir.cbegin(), need_to_check_if_empty_dir.cend(), [](auto& dir) { @@ -938,7 +948,6 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca queue.move_to_end(*cell->queue_iterator, cache_lock); } } - return st; } Status LRUFileCache::write_file_cache_version() const { diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index c7644ecbd8aea5..2874e87145cb7c 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -53,6 +53,9 @@ class LRUFileCache final : public IFileCache { LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings); ~LRUFileCache() override { _close = true; + if (_cache_background_load_thread.joinable()) { + _cache_background_thread.join(); + } if (_cache_background_thread.joinable()) { _cache_background_thread.join(); } @@ -163,7 +166,14 @@ class LRUFileCache final : public IFileCache { size_t get_available_cache_size(CacheType cache_type) const; - Status load_cache_info_into_memory(std::lock_guard& cache_lock); + /** + * @brief Refact the cache directory as version 2.0 while the version of cache is 1.0 + * + * @return Status + */ + Status try_convert_cache_version() const; + + void load_cache_info_into_memory(); Status write_file_cache_version() const; @@ -201,6 +211,8 @@ class LRUFileCache final : public IFileCache { private: std::atomic_bool _close {false}; std::thread _cache_background_thread; + std::atomic_bool _lazy_open_done {true}; + std::thread _cache_background_load_thread; size_t _num_read_segments = 0; size_t _num_hit_segments = 0; size_t _num_removed_segments = 0; From ab293ad4e8e85331e78a426963e2528a70e1353b Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Wed, 7 Aug 2024 15:56:56 +0800 Subject: [PATCH 2/9] get_or_set will return the FileBlock which state is SKIP_CACHE --- .../io/cache/block/block_lru_file_cache.cpp | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index ccb202fc79de7e..10929bc921debd 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -203,19 +203,7 @@ FileBlocks LRUFileCache::get_impl(const Key& key, const CacheContext& context, /// find list [segment1, ..., segmentN] of segments which intersect with given range. auto it = _files.find(key); if (it == _files.end()) { - if (_lazy_open_done) { - return {}; - } - // FileCacheKey key; - // key.hash = hash; - // key.meta.type = context.cache_type; - // key.meta.expiration_time = context.expiration_time; - // _storage->load_blocks_directly_unlocked(this, key, cache_lock); - - it = _files.find(hash); - if (it == _files.end()) [[unlikely]] { - return {}; - } + return {}; } const auto& file_blocks = it->second; @@ -394,6 +382,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size, const CacheContext& context) { + if (!_lazy_open_done) { + // Cache is not ready yet + LOG(WARNING) << std::format( + "Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.", + key.to_string(), offset, size); + FileBlocks file_blocks = {std::make_shared( + offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)}; + return FileBlocksHolder(std::move(file_blocks)); + } + FileBlock::Range range(offset, offset + size - 1); std::lock_guard cache_lock(_mutex); From 062a32e7a6898284c4aa042134ba189f00390199 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Wed, 7 Aug 2024 16:19:01 +0800 Subject: [PATCH 3/9] add log --- .../io/cache/block/block_lru_file_cache.cpp | 39 ++++++++++--------- be/src/io/cache/block/block_lru_file_cache.h | 2 +- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 10929bc921debd..2a5077dee8e89c 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -117,17 +117,31 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, } Status LRUFileCache::initialize() { - // MonotonicStopWatch watch; - // watch.start(); + MonotonicStopWatch watch; + watch.start(); if (!_is_initialized) { if (fs::exists(_cache_base_path)) { // the cache already exists, try to load cache info asyncly _lazy_open_done = false; RETURN_IF_ERROR(try_convert_cache_version()); _cache_background_load_thread = std::thread([this]() { - load_cache_info_into_memory(); + MonotonicStopWatch watch; + watch.start(); + std::lock_guard cache_lock(_mutex); + load_cache_info_into_memory(cache_lock); _lazy_open_done = true; - LOG_INFO("FileCache {} lazy load done.", _cache_base_path); + LOG_INFO("", _cache_base_path); + int64_t cost = watch.elapsed_time() / 1000 / 1000; + LOG(INFO) << fmt::format( + "FileCache lazy load done path={}, disposable queue size={} elements={}, " + "index queue size={} elements={}, query queue size={} elements={}, init " + "cost(ms)={}", + _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), + _disposable_queue.get_elements_num(cache_lock), + _index_queue.get_total_cache_size(cache_lock), + _index_queue.get_elements_num(cache_lock), + _normal_queue.get_total_cache_size(cache_lock), + _normal_queue.get_elements_num(cache_lock), cost); }); } else { std::error_code ec; @@ -141,18 +155,8 @@ Status LRUFileCache::initialize() { } _is_initialized = true; _cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this); - // int64_t cost = watch.elapsed_time() / 1000 / 1000; - // LOG(INFO) << fmt::format( - // "After initialize file cache path={}, disposable queue size={} elements={}, index " - // "queue size={} " - // "elements={}, query queue " - // "size={} elements={}, init cost(ms)={}", - // _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), - // _disposable_queue.get_elements_num(cache_lock), - // _index_queue.get_total_cache_size(cache_lock), - // _index_queue.get_elements_num(cache_lock), - // _normal_queue.get_total_cache_size(cache_lock), - // _normal_queue.get_elements_num(cache_lock), cost); + int64_t cost = watch.elapsed_time() / 1000 / 1000; + LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}", cost); return Status::OK(); } @@ -829,8 +833,7 @@ Status LRUFileCache::try_convert_cache_version() const { return Status::OK(); } -void LRUFileCache::load_cache_info_into_memory() { - std::lock_guard cache_lock(_mutex); +void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cache_lock) { Key key; uint64_t offset = 0; size_t size = 0; diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index 2874e87145cb7c..88072da33a35bf 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -173,7 +173,7 @@ class LRUFileCache final : public IFileCache { */ Status try_convert_cache_version() const; - void load_cache_info_into_memory(); + void load_cache_info_into_memory(std::lock_guard& cache_lock); Status write_file_cache_version() const; From a1d0f437208ff5a28fef5d6b8c910ef4a89fd4d2 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Wed, 7 Aug 2024 16:32:11 +0800 Subject: [PATCH 4/9] revert load_cache_info_into_memory --- .../io/cache/block/block_lru_file_cache.cpp | 53 ++++++++++++------- be/src/io/cache/block/block_lru_file_cache.h | 9 +--- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 2a5077dee8e89c..83c98e44c2047a 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -123,14 +123,17 @@ Status LRUFileCache::initialize() { if (fs::exists(_cache_base_path)) { // the cache already exists, try to load cache info asyncly _lazy_open_done = false; - RETURN_IF_ERROR(try_convert_cache_version()); _cache_background_load_thread = std::thread([this]() { MonotonicStopWatch watch; watch.start(); std::lock_guard cache_lock(_mutex); - load_cache_info_into_memory(cache_lock); - _lazy_open_done = true; - LOG_INFO("", _cache_base_path); + Status s = load_cache_info_into_memory(cache_lock); + if (s.ok()) { + _lazy_open_done = true; + } else { + LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}", + _cache_base_path, s.to_string()); + } int64_t cost = watch.elapsed_time() / 1000 / 1000; LOG(INFO) << fmt::format( "FileCache lazy load done path={}, disposable queue size={} elements={}, " @@ -806,7 +809,7 @@ void LRUFileCache::remove(FileBlockSPtr file_block, std::lock_guard& } } -Status LRUFileCache::try_convert_cache_version() const { +Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& cache_lock) { /// version 1.0: cache_base_path / key / offset /// version 2.0: cache_base_path / key_prefix / key / offset if (USE_CACHE_VERSION2 && read_file_cache_version() != "2.0") { @@ -819,26 +822,34 @@ Status LRUFileCache::try_convert_cache_version() const { std::string key_prefix = fs::path(_cache_base_path) / cache_key.substr(0, KEY_PREFIX_LENGTH); if (!fs::exists(key_prefix)) { - RETURN_IF_ERROR(fs::create_directories(key_prefix)); + std::error_code ec; + fs::create_directories(key_prefix, ec); + if (ec) { + LOG(WARNING) << "Failed to create new version cached directory: " + << ec.message(); + continue; + } + } + std::error_code ec; + std::filesystem::rename(key_it->path(), key_prefix / cache_key, ec); + if (ec) { + LOG(WARNING) + << "Failed to move old version cached directory: " << ec.message(); } - RETURN_IF_ERROR( - std::filesystem::rename(key_it->path(), key_prefix / cache_key)); } } } if (!write_file_cache_version().ok()) { - Lreturn Status::InternalError("Failed to write version hints for file cache"); + LOG(WARNING) << "Failed to write version hints for file cache"; } } - return Status::OK(); -} -void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cache_lock) { Key key; uint64_t offset = 0; size_t size = 0; std::vector> queue_entries; std::vector need_to_check_if_empty_dir; + Status st = Status::OK(); auto scan_file_cache = [&](fs::directory_iterator& key_it) { for (; key_it != fs::directory_iterator(); ++key_it) { key = Key( @@ -862,8 +873,8 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach std::error_code ec; std::filesystem::remove(offset_it->path(), ec); if (ec) { - LOG(WARNING) << "filesystem error, failed to remove file, file=" - << offset_it->path() << " error=" << ec.message(); + st = Status::IOError(ec.message()); + break; } continue; } else { @@ -875,8 +886,8 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach } if (!parsed) { - LOG(WARNING) << "parse offset err, path=" << offset_it->path().native(); - continue; + st = Status::IOError("Unexpected file: {}", offset_it->path().native()); + break; } size = offset_it->file_size(); @@ -884,8 +895,7 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach std::error_code ec; fs::remove(offset_it->path(), ec); if (ec) { - LOG(WARNING) << "filesystem error, failed to remove file, file=" - << offset_it->path() << " error=" << ec.message(); + LOG(WARNING) << ec.message(); } continue; } @@ -897,8 +907,7 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach std::error_code ec; std::filesystem::remove(offset_it->path(), ec); if (ec) { - LOG(WARNING) << "filesystem error, failed to remove file, file=" - << offset_it->path() << " error=" << ec.message(); + st = Status::IOError(ec.message()); } need_to_check_if_empty_dir.push_back(key_it->path()); } @@ -926,6 +935,9 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach fs::directory_iterator key_it {_cache_base_path}; scan_file_cache(key_it); } + if (!st) { + return st; + } std::for_each(need_to_check_if_empty_dir.cbegin(), need_to_check_if_empty_dir.cend(), [](auto& dir) { @@ -949,6 +961,7 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard& cach queue.move_to_end(*cell->queue_iterator, cache_lock); } } + return st; } Status LRUFileCache::write_file_cache_version() const { diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index 88072da33a35bf..bcf00d938a725b 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -166,14 +166,7 @@ class LRUFileCache final : public IFileCache { size_t get_available_cache_size(CacheType cache_type) const; - /** - * @brief Refact the cache directory as version 2.0 while the version of cache is 1.0 - * - * @return Status - */ - Status try_convert_cache_version() const; - - void load_cache_info_into_memory(std::lock_guard& cache_lock); + Status load_cache_info_into_memory(std::lock_guard& cache_lock); Status write_file_cache_version() const; From f2b118adbf6707afe9fe30b276fe6da72cdb3ad3 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Thu, 8 Aug 2024 14:51:07 +0800 Subject: [PATCH 5/9] fix --- be/src/io/cache/block/block_lru_file_cache.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 83c98e44c2047a..04e1c19cd840d2 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -159,7 +159,8 @@ Status LRUFileCache::initialize() { _is_initialized = true; _cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this); int64_t cost = watch.elapsed_time() / 1000 / 1000; - LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}", cost); + LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}", + _cache_base_path, cost); return Status::OK(); } From 47224eaf5b124b3a47a2ceca8c4ad3717a1fc79d Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Fri, 9 Aug 2024 17:26:04 +0800 Subject: [PATCH 6/9] add scan_file_num_to_sleep and sleep_time_per_scan_file to limit IO --- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 2 ++ be/src/io/cache/block/block_lru_file_cache.cpp | 6 ++++++ 3 files changed, 11 insertions(+) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6420a624127573..cdd7cb0c12ab2e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -996,6 +996,9 @@ DEFINE_Bool(enable_file_cache, "false"); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}] // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] DEFINE_String(file_cache_path, ""); +// thread will sleep 10ms per scan file num to limit IO +DEFINE_Int64(scan_file_num_to_sleep, "1000"); +DEFINE_Int64(sleep_time_per_scan_file, "10"); DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB // 4KB <= file_cache_max_file_segment_size <= 256MB DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool { diff --git a/be/src/common/config.h b/be/src/common/config.h index d1f91ab693d252..fd71bee79fefe3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1047,6 +1047,8 @@ DECLARE_Bool(enable_file_cache); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}] DECLARE_String(file_cache_path); +DECLARE_Int64(scan_file_num_to_sleep); +DECLARE_Int64(sleep_time_per_scan_file); DECLARE_Int64(file_cache_min_file_segment_size); DECLARE_Int64(file_cache_max_file_segment_size); DECLARE_Bool(clear_file_cache); diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 04e1c19cd840d2..1ce28be38e7a6a 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -851,6 +851,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::vector> queue_entries; std::vector need_to_check_if_empty_dir; Status st = Status::OK(); + size_t scan_file_num = 0; auto scan_file_cache = [&](fs::directory_iterator& key_it) { for (; key_it != fs::directory_iterator(); ++key_it) { key = Key( @@ -912,6 +913,11 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca } need_to_check_if_empty_dir.push_back(key_it->path()); } + scan_file_num += 1; + if (scan_file_num % config::scan_file_num_to_sleep == 0) { + std::this_thread::sleep_for( + std::chrono::milliseconds(config::sleep_time_per_scan_file)); + } } } }; From be456e11f42bccaa2e5e533ea837c507efd48484 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Fri, 9 Aug 2024 18:42:41 +0800 Subject: [PATCH 7/9] rename --- be/src/common/config.cpp | 4 ++-- be/src/common/config.h | 4 ++-- be/src/io/cache/block/block_lru_file_cache.cpp | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index cdd7cb0c12ab2e..a67a92c9e4e858 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -997,8 +997,8 @@ DEFINE_Bool(enable_file_cache, "false"); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] DEFINE_String(file_cache_path, ""); // thread will sleep 10ms per scan file num to limit IO -DEFINE_Int64(scan_file_num_to_sleep, "1000"); -DEFINE_Int64(sleep_time_per_scan_file, "10"); +DEFINE_Int64(async_file_cache_init_file_num_interval, "1000"); +DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20"); DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB // 4KB <= file_cache_max_file_segment_size <= 256MB DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool { diff --git a/be/src/common/config.h b/be/src/common/config.h index fd71bee79fefe3..efce10380c5844 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1047,8 +1047,8 @@ DECLARE_Bool(enable_file_cache); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}] DECLARE_String(file_cache_path); -DECLARE_Int64(scan_file_num_to_sleep); -DECLARE_Int64(sleep_time_per_scan_file); +DECLARE_Int64(async_file_cache_init_file_num_interval); +DECLARE_Int64(async_file_cache_init_sleep_interval_ms); DECLARE_Int64(file_cache_min_file_segment_size); DECLARE_Int64(file_cache_max_file_segment_size); DECLARE_Bool(clear_file_cache); diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 1ce28be38e7a6a..ad251c5baa509b 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -914,9 +914,9 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca need_to_check_if_empty_dir.push_back(key_it->path()); } scan_file_num += 1; - if (scan_file_num % config::scan_file_num_to_sleep == 0) { - std::this_thread::sleep_for( - std::chrono::milliseconds(config::sleep_time_per_scan_file)); + if (scan_file_num % config::async_file_cache_init_file_num_interval == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds( + config::async_file_cache_init_sleep_interval_ms)); } } } From b517b1007348090b63acac44bc49cba7b3df89d8 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Wed, 14 Aug 2024 00:58:05 +0800 Subject: [PATCH 8/9] use vlog --- be/src/io/cache/block/block_lru_file_cache.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index ad251c5baa509b..5d4030c5924e2d 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -39,6 +39,7 @@ #include #include +#include "common/logging.h" #include "common/status.h" #include "io/cache/block/block_file_cache.h" #include "io/cache/block/block_file_cache_fwd.h" @@ -392,7 +393,7 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t const CacheContext& context) { if (!_lazy_open_done) { // Cache is not ready yet - LOG(WARNING) << std::format( + VLOG_NOTICE << std::format( "Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.", key.to_string(), offset, size); FileBlocks file_blocks = {std::make_shared( From 70a05149cf4a742aeef26d953686e8b22fd1f007 Mon Sep 17 00:00:00 2001 From: suxiaogang223 Date: Wed, 14 Aug 2024 16:24:33 +0800 Subject: [PATCH 9/9] use fmt::format instead of std::format --- be/src/io/cache/block/block_lru_file_cache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 5d4030c5924e2d..349c626fb7d8dd 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -393,7 +393,7 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t const CacheContext& context) { if (!_lazy_open_done) { // Cache is not ready yet - VLOG_NOTICE << std::format( + VLOG_NOTICE << fmt::format( "Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.", key.to_string(), offset, size); FileBlocks file_blocks = {std::make_shared(