Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,9 @@ DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
DEFINE_String(file_cache_path, "");
// thread will sleep 10ms per scan file num to limit IO
DEFINE_Int64(async_file_cache_init_file_num_interval, "1000");
DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20");
DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
// 4KB <= file_cache_max_file_segment_size <= 256MB
DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ DECLARE_Bool(enable_file_cache);
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}]
DECLARE_String(file_cache_path);
DECLARE_Int64(async_file_cache_init_file_num_interval);
DECLARE_Int64(async_file_cache_init_sleep_interval_ms);
DECLARE_Int64(file_cache_min_file_segment_size);
DECLARE_Int64(file_cache_max_file_segment_size);
DECLARE_Bool(clear_file_cache);
Expand Down
57 changes: 44 additions & 13 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <system_error>
#include <utility>

#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache.h"
#include "io/cache/block/block_file_cache_fwd.h"
Expand Down Expand Up @@ -119,10 +120,33 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
Status LRUFileCache::initialize() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard cache_lock(_mutex);
if (!_is_initialized) {
if (fs::exists(_cache_base_path)) {
RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
// the cache already exists, try to load cache info asyncly
_lazy_open_done = false;
_cache_background_load_thread = std::thread([this]() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard<std::mutex> cache_lock(_mutex);
Status s = load_cache_info_into_memory(cache_lock);
if (s.ok()) {
_lazy_open_done = true;
} else {
LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}",
_cache_base_path, s.to_string());
}
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"FileCache lazy load done path={}, disposable queue size={} elements={}, "
"index queue size={} elements={}, query queue size={} elements={}, init "
"cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
});
} else {
std::error_code ec;
fs::create_directories(_cache_base_path, ec);
Expand All @@ -136,17 +160,8 @@ Status LRUFileCache::initialize() {
_is_initialized = true;
_cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this);
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"After initialize file cache path={}, disposable queue size={} elements={}, index "
"queue size={} "
"elements={}, query queue "
"size={} elements={}, init cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}",
_cache_base_path, cost);
return Status::OK();
}

Expand Down Expand Up @@ -376,6 +391,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co

FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size,
const CacheContext& context) {
if (!_lazy_open_done) {
// Cache is not ready yet
VLOG_NOTICE << fmt::format(
"Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.",
key.to_string(), offset, size);
FileBlocks file_blocks = {std::make_shared<FileBlock>(
offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)};
return FileBlocksHolder(std::move(file_blocks));
}

FileBlock::Range range(offset, offset + size - 1);

std::lock_guard cache_lock(_mutex);
Expand Down Expand Up @@ -827,6 +852,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
std::vector<std::pair<Key, size_t>> queue_entries;
std::vector<std::string> need_to_check_if_empty_dir;
Status st = Status::OK();
size_t scan_file_num = 0;
auto scan_file_cache = [&](fs::directory_iterator& key_it) {
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(
Expand Down Expand Up @@ -888,6 +914,11 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
}
need_to_check_if_empty_dir.push_back(key_it->path());
}
scan_file_num += 1;
if (scan_file_num % config::async_file_cache_init_file_num_interval == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(
config::async_file_cache_init_sleep_interval_ms));
}
}
}
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block/block_lru_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class LRUFileCache final : public IFileCache {
LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
~LRUFileCache() override {
_close = true;
if (_cache_background_load_thread.joinable()) {
_cache_background_thread.join();
}
if (_cache_background_thread.joinable()) {
_cache_background_thread.join();
}
Expand Down Expand Up @@ -201,6 +204,8 @@ class LRUFileCache final : public IFileCache {
private:
std::atomic_bool _close {false};
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {true};
std::thread _cache_background_load_thread;
size_t _num_read_segments = 0;
size_t _num_hit_segments = 0;
size_t _num_removed_segments = 0;
Expand Down