Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ DEFINE_Validator(file_cache_min_file_segment_size, [](const int64_t config) -> b
DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_wait_sec_after_fail, "0"); // // zero for no waiting and retrying
DEFINE_mInt32(file_cache_max_evict_num_per_round, "5000");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
// only for debug, will be removed after finding out the root cause
DECLARE_mInt32(file_cache_wait_sec_after_fail); // zero for no waiting and retrying
DECLARE_mInt32(file_cache_max_evict_num_per_round);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/block/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class IFileCache {
mutable std::mutex _mutex;

virtual bool try_reserve(const Key& key, const CacheContext& context, size_t offset,
size_t size, std::lock_guard<std::mutex>& cache_lock) = 0;
size_t size, std::lock_guard<std::mutex>& cache_lock,
bool skip_round_check) = 0;

virtual void remove(FileBlockSPtr file_segment, std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& segment_lock) = 0;
Expand Down
22 changes: 16 additions & 6 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ FileBlocks LRUFileCache::split_range_into_cells(const Key& key, const CacheConte
while (current_pos < end_pos_non_included) {
current_size = std::min(remaining_size, _max_file_segment_size);
remaining_size -= current_size;
state = try_reserve(key, context, current_pos, current_size, cache_lock)
state = try_reserve(key, context, current_pos, current_size, cache_lock, false)
? state
: FileBlock::State::SKIP_CACHE;
if (UNLIKELY(state == FileBlock::State::SKIP_CACHE)) {
Expand Down Expand Up @@ -536,16 +536,19 @@ const LRUFileCache::LRUQueue& LRUFileCache::get_queue(CacheType type) const {
// a. evict from query queue
// b. evict from other queue
bool LRUFileCache::try_reserve(const Key& key, const CacheContext& context, size_t offset,
size_t size, std::lock_guard<std::mutex>& cache_lock) {
size_t size, std::lock_guard<std::mutex>& cache_lock,
bool skip_round_check) {
auto query_context =
_enable_file_cache_query_limit && (context.query_id.hi != 0 || context.query_id.lo != 0)
? get_query_context(context.query_id, cache_lock)
: nullptr;
if (!query_context) {
return try_reserve_for_lru(key, nullptr, context, offset, size, cache_lock);
return try_reserve_for_lru(key, nullptr, context, offset, size, skip_round_check,
cache_lock);
} else if (query_context->get_cache_size(cache_lock) + size <=
query_context->get_max_cache_size()) {
return try_reserve_for_lru(key, query_context, context, offset, size, cache_lock);
return try_reserve_for_lru(key, query_context, context, offset, size, skip_round_check,
cache_lock);
}
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
Expand Down Expand Up @@ -708,6 +711,7 @@ bool LRUFileCache::try_reserve_from_other_queue(CacheType cur_cache_type, size_t

bool LRUFileCache::try_reserve_for_lru(const Key& key, QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset, size_t size,
bool skip_round_check,
std::lock_guard<std::mutex>& cache_lock) {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
Expand All @@ -729,8 +733,10 @@ bool LRUFileCache::try_reserve_for_lru(const Key& key, QueryFileCacheContextPtr

std::vector<FileBlockCell*> to_evict;
std::vector<FileBlockCell*> trash;
size_t evict_num = 0;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow()) {
if (!is_overflow() ||
(!skip_round_check && evict_num > config::file_cache_max_evict_num_per_round)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
Expand Down Expand Up @@ -762,6 +768,7 @@ bool LRUFileCache::try_reserve_for_lru(const Key& key, QueryFileCacheContextPtr

removed_size += cell_size;
--queue_element_size;
++evict_num;
}
}

Expand All @@ -773,6 +780,9 @@ bool LRUFileCache::try_reserve_for_lru(const Key& key, QueryFileCacheContextPtr
}
};

if (evict_num > config::file_cache_max_evict_num_per_round) {
LOG(INFO) << "debug evict from file cache number: " << evict_num;
}
std::for_each(trash.begin(), trash.end(), remove_file_block_if);
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);

Expand Down Expand Up @@ -931,7 +941,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
continue;
}
context.cache_type = cache_type;
if (try_reserve(key, context, offset, size, cache_lock)) {
if (try_reserve(key, context, offset, size, cache_lock, true)) {
add_cell(key, context, offset, size, FileBlock::State::DOWNLOADED, cache_lock);
queue_entries.emplace_back(key, offset);
} else {
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/block/block_lru_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ class LRUFileCache final : public IFileCache {
std::lock_guard<std::mutex>& cache_lock);

bool try_reserve(const Key& key, const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) override;
std::lock_guard<std::mutex>& cache_lock, bool skip_round_check) override;

bool try_reserve_for_lru(const Key& key, QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock);
bool skip_round_check, std::lock_guard<std::mutex>& cache_lock);

std::vector<CacheType> get_other_cache_type(CacheType cur_cache_type);

Expand Down