Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,12 @@ DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB

DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
// If true, evict the ttl cache using LRU when full.
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,11 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_evict_file_cache_in_advance);
DECLARE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent);
DECLARE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent);
DECLARE_mInt32(file_cache_evict_in_advance_interval_ms);
DECLARE_mInt64(file_cache_evict_in_advance_batch_bytes);
DECLARE_mBool(enable_read_cache_file_directly);
DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
// If true, evict the ttl cache using LRU when full.
Expand Down
146 changes: 130 additions & 16 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
"file_cache_hit_ratio_1h", 0.0);
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
_need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);

_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
Expand All @@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
_cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
_evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");

_recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_recycle_keys_length");

_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60 * 60);
Expand Down Expand Up @@ -339,6 +346,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
_cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this);
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
_cache_background_evict_in_advance_thread =
std::thread(&BlockFileCache::run_background_evict_in_advance, this);

return Status::OK();
}
Expand Down Expand Up @@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext&
return true;
}

void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
UInt128Wrapper hash = UInt128Wrapper();
size_t offset = 0;
CacheContext context;
context.cache_type = FileCacheType::NORMAL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false);
context.cache_type = FileCacheType::TTL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false);
}

bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly,
std::lock_guard<std::mutex>& cache_lock, bool sync) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
Expand Down Expand Up @@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size

bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
Expand Down Expand Up @@ -1211,7 +1230,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
}
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
}
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);

return !is_overflow(removed_size, size, cur_cache_size);
}
Expand All @@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,

bool BlockFileCache::try_reserve_from_other_queue_by_size(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
Expand All @@ -1249,17 +1268,18 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
cur_removed_size);
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
}
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);
return !is_overflow(removed_size, size, cur_cache_size);
}

bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
int64_t cur_time,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock,
bool sync_removal) {
// currently, TTL cache is not considered as a candidate
auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
bool reserve_success = try_reserve_from_other_queue_by_time_interval(
cur_cache_type, other_cache_types, size, cur_time, cache_lock);
cur_cache_type, other_cache_types, size, cur_time, cache_lock, sync_removal);
if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
return reserve_success;
}
Expand All @@ -1272,14 +1292,15 @@ bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
return false;
}
return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size,
cache_lock);
return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
sync_removal);
}

bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock,
bool sync_removal) {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
Expand All @@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
cur_removed_size);
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);
*(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;

if (is_overflow(removed_size, size, cur_cache_size)) {
Expand Down Expand Up @@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
// so there will be a window that the file is not in the cache but still in the storage
// but it's ok, because the rowset is stale already
bool ret = _recycle_keys.enqueue(key);
if (!ret) {
if (ret) [[likely]] {
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
} else {
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
int64_t duration_ns = 0;
Status st;
Expand Down Expand Up @@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent)
int inode_percentage = int(inode_free * 1.0 / inode_total * 100);
percent->first = capacity_percentage;
percent->second = 100 - inode_percentage;

// Add sync point for testing
TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);

return 0;
}

Expand Down Expand Up @@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
.tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
config::file_cache_enter_disk_resource_limit_mode_percent = 90;
config::file_cache_enter_disk_resource_limit_mode_percent = 88;
config::file_cache_exit_disk_resource_limit_mode_percent = 80;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)) {
Expand All @@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() {
}
}

void BlockFileCache::check_need_evict_cache_in_advance() {
if (_storage->get_type() != FileCacheStorageType::DISK) {
return;
}

std::pair<int, int> percent;
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [space_percentage, inode_percentage] = percent;
size_t size_percentage = static_cast<size_t>(
(static_cast<double>(_cur_cache_size) / static_cast<double>(_capacity)) * 100);
auto is_insufficient = [](const int& percentage) {
return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
};
DCHECK_GE(space_percentage, 0);
DCHECK_LE(space_percentage, 100);
DCHECK_GE(inode_percentage, 0);
DCHECK_LE(inode_percentage, 100);
// ATTN: due to that can be changed dynamically, set it to default value if it's invalid
// FIXME: reject with config validator
if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
config::file_cache_exit_need_evict_cache_in_advance_percent) {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
.tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage) ||
is_insufficient(size_percentage)) {
_need_evict_cache_in_advance = true;
_need_evict_cache_in_advance_metrics->set_value(1);
} else if (_need_evict_cache_in_advance &&
(space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
_need_evict_cache_in_advance = false;
_need_evict_cache_in_advance_metrics->set_value(0);
}
if (_need_evict_cache_in_advance) {
LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
<< " inode_percent=" << inode_percentage << " size_percent=" << size_percentage
<< " is_space_insufficient=" << is_insufficient(space_percentage)
<< " is_inode_insufficient=" << is_insufficient(inode_percentage)
<< " is_size_insufficient=" << is_insufficient(size_percentage)
<< " need evict cache in advance";
}
}

void BlockFileCache::run_background_monitor() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
check_disk_resource_limit();
if (config::enable_evict_file_cache_in_advance) {
check_need_evict_cache_in_advance();
} else {
_need_evict_cache_in_advance = false;
}

{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
Expand Down Expand Up @@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() {
break;
}
}
while (_recycle_keys.try_dequeue(key)) {
if (batch_count >= batch_limit) {
break;
}

while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
int64_t duration_ns = 0;
Status st;
{
Expand All @@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() {
}
batch_count++;
}
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
batch_count = 0;
}
}

void BlockFileCache::run_background_evict_in_advance() {
LOG(INFO) << "Starting background evict in advance thread";
int64_t batch = 0;
while (!_close) {
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(
close_lock,
std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
if (_close) {
LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
break;
}
}
batch = config::file_cache_evict_in_advance_batch_bytes;

// Skip if eviction not needed or too many pending recycles
if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >= (batch * 10)) {
continue;
}

int64_t duration_ns = 0;
{
SCOPED_CACHE_LOCK(_mutex, this);
SCOPED_RAW_TIMER(&duration_ns);
try_evict_in_advance(batch, cache_lock);
}
*_evict_in_advance_latency_us << (duration_ns / 1000);
}
}

void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
SCOPED_CACHE_LOCK(_mutex, this);
Expand Down
Loading
Loading