diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f2f21162bf05b5..fd373fe49af730 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -456,7 +456,7 @@ void CloudTablet::recycle_cached_data(const std::vector& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - if (rs.use_count() >= 1) { + if (rs.use_count() > 1) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() << " references. File Cache won't be recycled when query is using it."; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index f60d0eeb5ba0dd..d597ccc42a115f 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -238,7 +238,9 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) { num_vacuumed += t->delete_expired_stale_rowsets(); } - LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed); + LOG_INFO("finish vacuum stale rowsets") + .tag("num_vacuumed", num_vacuumed) + .tag("num_tablets", tablets_to_vacuum.size()); } std::vector> CloudTabletMgr::get_weak_tablets() { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 218dc2ae6b152e..8592a819bb0458 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1058,7 +1058,6 @@ DEFINE_mBool(enbale_dump_error_file, "true"); // limit the max size of error log on disk DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB DEFINE_mInt64(cache_lock_long_tail_threshold, "1000"); -DEFINE_Int64(file_cache_recycle_keys_size, "1000000"); DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false"); DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); diff --git a/be/src/common/config.h b/be/src/common/config.h index ddca52c607b495..309d486064f54b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1098,7 +1098,6 @@ DECLARE_mBool(enbale_dump_error_file); // limit the max size of error log on disk DECLARE_mInt64(file_cache_error_log_limit_bytes); DECLARE_mInt64(cache_lock_long_tail_threshold); -DECLARE_Int64(file_cache_recycle_keys_size); // Base compaction may retrieve and produce some less frequently accessed data, // potentially affecting the file cache hit rate. // This configuration determines whether to retain the output within the file cache. diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index b5f48d09648bc6..c3f135ce0d9776 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -211,8 +211,6 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits::max()); - _recycle_keys = std::make_shared>( - config::file_cache_recycle_keys_size); if (cache_settings.storage == "memory") { _storage = std::make_unique(); _cache_base_path = "memory"; @@ -328,7 +326,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard& cache_lo DCHECK(!_is_initialized); _is_initialized = true; RETURN_IF_ERROR(_storage->init(this)); - _cache_background_thread = std::thread(&BlockFileCache::run_background_operation, this); + _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this); + _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); return Status::OK(); } @@ -562,7 +561,7 @@ std::string BlockFileCache::clear_file_cache_async() { void BlockFileCache::recycle_deleted_blocks() { using namespace std::chrono; - static int remove_batch = 100; + static int remove_batch = 500; TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch); TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks"); std::unique_lock cache_lock(_mutex); @@ -573,45 +572,34 @@ void BlockFileCache::recycle_deleted_blocks() { int i = 0; std::condition_variable cond; auto start_time = steady_clock::time_point(); - if (_async_clear_file_cache) { - LOG_INFO("Start clear file cache async").tag("path", _cache_base_path); - auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) { - std::lock_guard segment_lock(cell->file_block->_mutex); - remove(cell->file_block, cache_lock, segment_lock); - }; - static int remove_batch = 100; - TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch); - int i = 0; - std::condition_variable cond; - auto iter_queue = [&](LRUQueue& queue) { - bool end = false; - while (queue.get_capacity(cache_lock) != 0 && !end) { - std::vector cells; - for (const auto& [entry_key, entry_offset, _] : queue) { - if (i == remove_batch) { - i = 0; - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - if (!cell) continue; - if (!cell->is_deleted) { - end = true; - break; - } else if (cell->releasable()) { - i++; - cells.push_back(cell); - } + + LOG_INFO("Start clear file cache async").tag("path", _cache_base_path); + auto iter_queue = [&](LRUQueue& queue) { + bool end = false; + while (queue.get_capacity(cache_lock) != 0 && !end) { + std::vector cells; + for (const auto& [entry_key, entry_offset, _] : queue) { + if (i == remove_batch) { + i = 0; + break; + } + auto* cell = get_cell(entry_key, entry_offset, cache_lock); + if (!cell) continue; + if (cell->releasable()) { + i++; + cells.push_back(cell); } - std::ranges::for_each(cells, remove_file_block); - // just for sleep - cond.wait_for(cache_lock, std::chrono::microseconds(100)); } - }; - iter_queue(get_queue(FileCacheType::DISPOSABLE)); - iter_queue(get_queue(FileCacheType::NORMAL)); - iter_queue(get_queue(FileCacheType::INDEX)); - } - if (_async_clear_file_cache || config::file_cache_ttl_valid_check_interval_second != 0) { + std::ranges::for_each(cells, remove_file_block); + // just for sleep + cond.wait_for(cache_lock, std::chrono::microseconds(100)); + } + }; + iter_queue(get_queue(FileCacheType::DISPOSABLE)); + iter_queue(get_queue(FileCacheType::NORMAL)); + iter_queue(get_queue(FileCacheType::INDEX)); + + if (config::file_cache_ttl_valid_check_interval_second != 0) { std::vector ttl_keys; ttl_keys.reserve(_key_to_time.size()); for (auto& [key, _] : _key_to_time) { @@ -630,14 +618,11 @@ void BlockFileCache::recycle_deleted_blocks() { cell.is_deleted = cell.is_deleted ? true - : (config::file_cache_ttl_valid_check_interval_second == 0 - ? false - : std::chrono::duration_cast( - std::chrono::steady_clock::now() - .time_since_epoch()) - .count() - - cell.atime > - config::file_cache_ttl_valid_check_interval_second); + : std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + cell.atime > + config::file_cache_ttl_valid_check_interval_second; if (!cell.is_deleted) { continue; } else if (cell.releasable()) { @@ -648,14 +633,11 @@ void BlockFileCache::recycle_deleted_blocks() { std::ranges::for_each(cells, remove_file_block); } } - if (_async_clear_file_cache) { - _async_clear_file_cache = false; - auto use_time = duration_cast(steady_clock::time_point() - start_time); - LOG_INFO("End clear file cache async") - .tag("path", _cache_base_path) - .tag("use_time", static_cast(use_time.count())); - } } + auto use_time = duration_cast(steady_clock::time_point() - start_time); + LOG_INFO("End clear file cache async") + .tag("path", _cache_base_path) + .tag("use_time", static_cast(use_time.count())); } FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash, @@ -925,18 +907,6 @@ void BlockFileCache::remove_file_blocks(std::vector& to_evict, std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); } -void BlockFileCache::remove_file_blocks_async(std::vector& to_evict, - std::lock_guard& cache_lock) { - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock, /*sync*/ false); - } - }; - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); -} - void BlockFileCache::remove_file_blocks_and_clean_time_maps( std::vector& to_evict, std::lock_guard& cache_lock) { auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) { @@ -1183,15 +1153,11 @@ void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock); if (!is_ttl_file) { auto iter = _files.find(file_key); - std::vector to_remove; if (iter != _files.end()) { for (auto& [_, cell] : iter->second) { - if (cell.releasable()) { - to_remove.push_back(&cell); - } + cell.is_deleted = true; } } - remove_file_blocks_async(to_remove, cache_lock); } } @@ -1378,7 +1344,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, template requires IsXLock && IsXLock -void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) { +void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock) { auto hash = file_block->get_hash_value(); auto offset = file_block->offset(); auto type = file_block->cache_type(); @@ -1398,25 +1364,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo key.offset = offset; key.meta.type = type; key.meta.expiration_time = expiration_time; - if (sync) { - Status st = _storage->remove(key); - if (!st.ok()) { - LOG_WARNING("").error(st); - } - } else { - // the file will be deleted in the bottom half - // so there will be a window that the file is not in the cache but still in the storage - // but it's ok, because the rowset is stale already - // in case something unexpected happen, set the _recycle_keys queue to zero to fallback - bool ret = _recycle_keys->push(key); - if (!ret) { - LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); - Status st = _storage->remove(key); - if (!st.ok()) { - LOG_WARNING("").error(st); - } - } + Status st = _storage->remove(key); + if (!st.ok()) { + LOG_WARNING("").error(st); } + } else if (cell->file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) { + cell->is_deleted = true; } _cur_cache_size -= file_block->range().size(); if (FileCacheType::TTL == type) { @@ -1430,16 +1383,6 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo *_num_removed_blocks << 1; } -void BlockFileCache::recycle_stale_rowset_async_bottom_half() { - FileCacheKey key; - while (_recycle_keys->pop(key)) { - Status st = _storage->remove(key); - if (!st.ok()) { - LOG_WARNING("").error(st); - } - } -} - size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { SCOPED_CACHE_LOCK(_mutex); return get_used_cache_size_unlocked(cache_type, cache_lock); @@ -1729,7 +1672,35 @@ void BlockFileCache::check_disk_resource_limit() { } } -void BlockFileCache::run_background_operation() { +void BlockFileCache::run_background_gc() { + int64_t interval_time_seconds = 10; + while (!_close) { + TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds); + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds)); + if (_close) { + break; + } + } + + recycle_deleted_blocks(); + + { + int64_t cur_time = UnixSeconds(); + SCOPED_CACHE_LOCK(_mutex); + while (!_time_to_key.empty()) { + auto begin = _time_to_key.begin(); + if (cur_time < begin->first) { + break; + } + remove_if_ttl_file_unlock(begin->second, false, cache_lock); + } + } + } +} + +void BlockFileCache::run_background_monitor() { int64_t interval_time_seconds = 20; while (!_close) { TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds); @@ -1777,21 +1748,6 @@ void BlockFileCache::run_background_operation() { _num_read_blocks_1h->get_value()); } } - - recycle_stale_rowset_async_bottom_half(); - recycle_deleted_blocks(); - // gc - { - int64_t cur_time = UnixSeconds(); - SCOPED_CACHE_LOCK(_mutex); - while (!_time_to_key.empty()) { - auto begin = _time_to_key.begin(); - if (cur_time < begin->first) { - break; - } - remove_if_ttl_file_unlock(begin->second, false, cache_lock); - } - } } } @@ -2053,5 +2009,5 @@ std::map BlockFileCache::get_stats_unsafe() { template void BlockFileCache::remove(FileBlockSPtr file_block, std::lock_guard& cache_lock, - std::lock_guard& block_lock, bool sync); + std::lock_guard& block_lock); } // namespace doris::io diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index f23d5a3799e0cf..86a6cbec0b567b 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -95,8 +95,11 @@ class BlockFileCache { _close = true; } _close_cv.notify_all(); - if (_cache_background_thread.joinable()) { - _cache_background_thread.join(); + if (_cache_background_gc_thread.joinable()) { + _cache_background_gc_thread.join(); + } + if (_cache_background_monitor_thread.joinable()) { + _cache_background_monitor_thread.join(); } } @@ -365,7 +368,7 @@ class BlockFileCache { template requires IsXLock && IsXLock - void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock, bool sync = true); + void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock); FileBlocks get_impl(const UInt128Wrapper& hash, const CacheContext& context, const FileBlock::Range& range, std::lock_guard& cache_lock); @@ -428,7 +431,8 @@ class BlockFileCache { bool remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, bool remove_directly, std::lock_guard&); - void run_background_operation(); + void run_background_gc(); + void run_background_monitor(); void recycle_deleted_blocks(); @@ -445,8 +449,6 @@ class BlockFileCache { void remove_file_blocks(std::vector&, std::lock_guard&); - void remove_file_blocks_async(std::vector&, std::lock_guard&); - void remove_file_blocks_and_clean_time_maps(std::vector&, std::lock_guard&); @@ -454,8 +456,6 @@ class BlockFileCache { size_t& removed_size, std::vector& to_evict, std::lock_guard& cache_lock, size_t& cur_removed_size); - void recycle_stale_rowset_async_bottom_half(); - // info std::string _cache_base_path; size_t _capacity = 0; @@ -467,7 +467,8 @@ class BlockFileCache { bool _close {false}; std::mutex _close_mtx; std::condition_variable _close_cv; - std::thread _cache_background_thread; + std::thread _cache_background_gc_thread; + std::thread _cache_background_monitor_thread; std::atomic_bool _async_open_done {false}; bool _async_clear_file_cache {false}; // disk space or inode is less than the specified value @@ -494,9 +495,6 @@ class BlockFileCache { LRUQueue _disposable_queue; LRUQueue _ttl_queue; - // keys for async remove - std::shared_ptr> _recycle_keys; - // metrics std::shared_ptr> _cache_capacity_metrics; std::shared_ptr> _cur_cache_size_metrics;