Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,13 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
LOG_ERROR("").tag("file cache path", cache_base_path).tag("error", strerror(errno));
return Status::IOError("{} statfs error {}", cache_base_path, strerror(errno));
}
size_t disk_capacity = static_cast<size_t>(
static_cast<size_t>(stat.f_blocks) * static_cast<size_t>(stat.f_bsize) *
(static_cast<double>(config::file_cache_enter_disk_resource_limit_mode_percent) /
100));
size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize));
if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) {
LOG_INFO(
"The cache {} config size {} is larger than {}% disk size {} or zero, recalc "
"The cache {} config size {} is larger than disk size {} or zero, recalc "
"it.",
cache_base_path, file_cache_settings.capacity,
config::file_cache_enter_disk_resource_limit_mode_percent, disk_capacity);
cache_base_path, file_cache_settings.capacity, disk_capacity);
file_cache_settings = get_file_cache_settings(disk_capacity,
file_cache_settings.max_query_cache_size);
}
Expand Down Expand Up @@ -174,16 +171,59 @@ std::vector<std::string> FileCacheFactory::get_base_paths() {
return paths;
}

std::string validate_capacity(const std::string& path, int64_t new_capacity,
int64_t& valid_capacity) {
struct statfs stat;
if (statfs(path.c_str(), &stat) < 0) {
auto ret = fmt::format("reset capacity {} statfs error {}. ", path, strerror(errno));
LOG_ERROR(ret);
valid_capacity = 0; // caller will handle the error
return ret;
}
size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize));
if (new_capacity == 0 || disk_capacity < new_capacity) {
auto ret = fmt::format(
"The cache {} config size {} is larger than disk size {} or zero, recalc "
"it to disk size. ",
path, new_capacity, disk_capacity);
valid_capacity = disk_capacity;
LOG_WARNING(ret);
return ret;
}
valid_capacity = new_capacity;
return "";
}

std::string FileCacheFactory::reset_capacity(const std::string& path, int64_t new_capacity) {
std::stringstream ss;
size_t total_capacity = 0;
if (path.empty()) {
std::stringstream ss;
for (auto& [_, cache] : _path_to_cache) {
ss << cache->reset_capacity(new_capacity);
for (auto& [p, cache] : _path_to_cache) {
int64_t valid_capacity = 0;
ss << validate_capacity(p, new_capacity, valid_capacity);
if (valid_capacity <= 0) {
return ss.str();
}
ss << cache->reset_capacity(valid_capacity);
total_capacity += cache->capacity();
}
_capacity = total_capacity;
return ss.str();
} else {
if (auto iter = _path_to_cache.find(path); iter != _path_to_cache.end()) {
return iter->second->reset_capacity(new_capacity);
int64_t valid_capacity = 0;
ss << validate_capacity(path, new_capacity, valid_capacity);
if (valid_capacity <= 0) {
return ss.str();
}
ss << iter->second->reset_capacity(valid_capacity);

for (auto& [p, cache] : _path_to_cache) {
total_capacity += cache->capacity();
}
_capacity = total_capacity;
return ss.str();
}
}
return "Unknown the cache path " + path;
Expand Down
77 changes: 77 additions & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7905,4 +7905,81 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) {
FileCacheFactory::instance()->_capacity = 0;
}

TEST_F(BlockFileCacheTest, test_reset_capacity) {
std::string cache_path2 = caches_dir / "cache2" / "";

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
if (fs::exists(cache_path2)) {
fs::remove_all(cache_path2);
}

io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
settings.disposable_queue_size = 30;
settings.disposable_queue_elements = 5;
settings.capacity = 90;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok());
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_path2, settings).ok());
EXPECT_EQ(FileCacheFactory::instance()->get_cache_instance_size(), 2);
EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 180);

// valid path + valid capacity
auto s = FileCacheFactory::instance()->reset_capacity(cache_base_path, 80);
LOG(INFO) << s;
EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 170);

// empty path + valid capacity
s = FileCacheFactory::instance()->reset_capacity("", 70);
LOG(INFO) << s;
EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 140);

// invalid path + valid capacity
s = FileCacheFactory::instance()->reset_capacity("/not/exist/haha", 70);
LOG(INFO) << s;
EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 140);

// valid path + invalid capacity
s = FileCacheFactory::instance()->reset_capacity(cache_base_path, INT64_MAX);
LOG(INFO) << s;
EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX);
EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70);

// valid path + zero capacity
s = FileCacheFactory::instance()->reset_capacity(cache_base_path, 0);
LOG(INFO) << s;
EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX);
EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70);

// empty path + invalid capacity
s = FileCacheFactory::instance()->reset_capacity("", INT64_MAX);
LOG(INFO) << s;
EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX);
EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70);

// empty path + zero capacity
s = FileCacheFactory::instance()->reset_capacity("", 0);
LOG(INFO) << s;
EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX);
EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70);

FileCacheFactory::instance()->clear_file_caches(true);
std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
if (fs::exists(cache_path2)) {
fs::remove_all(cache_path2);
}
FileCacheFactory::instance()->_caches.clear();
FileCacheFactory::instance()->_path_to_cache.clear();
FileCacheFactory::instance()->_capacity = 0;
}

} // namespace doris::io
Loading