From a8adb9a6682242a124d34202f0b40e7763c16920 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 17 Jun 2025 09:43:54 +0800 Subject: [PATCH 1/5] [optimization](filecache) speed up filecache warm up this pr does the following: 1. make file cache downloader worker pool thread num configurable 2. make warm up job split batch size configurable 3. split large file downloading task to smaller ones to maintain load balance between threads, thus improve concurrency 4. use meta info to deduce size of inverted idx file size to reduce S3 HEAD ops 5. some log print optimization in our test, this opt can improve more than 3x file cache warm up performance Signed-off-by: zhengyu --- be/src/cloud/cloud_backend_service.cpp | 3 +- be/src/cloud/cloud_tablet.cpp | 12 +- be/src/cloud/cloud_warm_up_manager.cpp | 136 +++++++++++------- be/src/cloud/cloud_warm_up_manager.h | 6 +- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + .../io/cache/block_file_cache_downloader.cpp | 4 +- be/src/io/cache/block_file_cache_downloader.h | 2 +- .../java/org/apache/doris/common/Config.java | 3 + .../doris/cloud/CacheHotspotManager.java | 2 +- 10 files changed, 113 insertions(+), 61 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 63f76632d79695..1491cf83e0a0ad 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -107,7 +107,8 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, .tag("request_type", "SET_BATCH") .tag("job_id", request.job_id) .tag("batch_id", request.batch_id) - .tag("jobs size", request.job_metas.size()); + .tag("jobs size", request.job_metas.size()) + .tag("tablet num of first meta", request.job_metas[0].tablet_ids.size()); bool retry = false; st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry); if (!retry && st) { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 1c046eeac8fe7c..673d721d84e8f1 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -286,7 +286,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }); auto download_idx_file = [&](const io::Path& idx_path) { @@ -299,7 +303,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }; _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); }; diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index d8bce097465dde..a0b3f544de276e 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -17,7 +17,8 @@ #include "cloud/cloud_warm_up_manager.h" -#include +#include +#include #include #include @@ -34,6 +35,8 @@ namespace doris { +bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); + CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); } @@ -59,6 +62,50 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, + io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait) { + if (file_size < 0) { + auto st = file_system->file_size(path, &file_size); + LOG(WARNING) << "get file size failed: " << path; + file_cache_warm_up_failed_task_num << 1; + return; + } + + const int64_t chunk_size = 10 * 1024 * 1024; // 10MB + int64_t offset = 0; + int64_t remaining_size = file_size; + + while (remaining_size > 0) { + int64_t current_chunk_size = std::min(chunk_size, remaining_size); + wait->add_count(); + + _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { + .path = path, + .file_size = file_size, + .offset = offset, + .download_size = current_chunk_size, + .file_system = file_system, + .ctx = + { + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + }, + .download_done = + [wait](Status st) { + if (!st) { + LOG_WARNING("Warm up error ").error(st); + } + wait->signal(); + }, + }); + + offset += current_chunk_size; + remaining_size -= current_chunk_size; + } +} + void CloudWarmUpManager::handle_jobs() { #ifndef BE_TEST constexpr int WAIT_TIME_SECONDS = 600; @@ -77,6 +124,10 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up job is null"); continue; } + + std::shared_ptr wait = + std::make_shared(0); + for (int64_t tablet_id : cur_job->tablet_ids) { if (_cur_job_id == 0) { // The job is canceled break; @@ -92,8 +143,7 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st); continue; } - std::shared_ptr wait = - std::make_shared(0); + auto tablet_meta = tablet->tablet_meta(); auto rs_metas = snapshot_rs_metas(tablet.get()); for (auto& [_, rs] : rs_metas) { @@ -112,71 +162,51 @@ void CloudWarmUpManager::handle_jobs() { expiration_time = 0; } - wait->add_count(); - // clang-format off - _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { - .path = storage_resource.value()->remote_segment_path(*rs, seg_id), - .file_size = rs->segment_file_size(seg_id), - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }); - - auto download_idx_file = [&](const io::Path& idx_path) { - io::DownloadFileMeta meta { - .path = idx_path, - .file_size = -1, - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }; - // clang-format on - _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); - }; + // 1st. download segment files + submit_download_tasks( + storage_resource.value()->remote_segment_path(*rs, seg_id), + rs->segment_file_size(seg_id), storage_resource.value()->fs, + expiration_time, wait); + + // 2nd. download inverted index files + int64_t file_size = -1; auto schema_ptr = rs->tablet_schema(); auto idx_version = schema_ptr->get_inverted_index_storage_format(); + const auto& idx_file_info = rs->inverted_index_file_info(seg_id); if (idx_version == InvertedIndexStorageFormatPB::V1) { for (const auto& index : schema_ptr->inverted_indexes()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v1_path( *rs, seg_id, index->index_id(), index->get_index_suffix()); - download_idx_file(idx_path); + if (idx_file_info.index_info_size() > 0) { + for (const auto& idx_info : idx_file_info.index_info()) { + if (index->index_id() == idx_info.index_id() && + index->get_index_suffix() == idx_info.index_suffix()) { + file_size = idx_info.index_file_size(); + break; + } + } + } + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } else { if (schema_ptr->has_inverted_index()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v2_path(*rs, seg_id); - download_idx_file(idx_path); + file_size = idx_file_info.has_index_size() ? idx_file_info.index_size() + : -1; + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } } } - timespec time; - time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; - if (!wait->timed_wait(time)) { - LOG_WARNING("Warm up tablet {} take a long time", tablet_meta->tablet_id()); - } + } + + timespec time; + time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; + if (!wait->timed_wait(time)) { + LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); } { std::unique_lock lock(_mtx); diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 219dedc58065a6..356d7284f6f3ee 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -69,7 +71,9 @@ class CloudWarmUpManager { private: void handle_jobs(); - + void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait); std::mutex _mtx; std::condition_variable _cond; int64_t _cur_job_id {0}; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index dd3be9f152016a..2badc8ca97f4b5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1114,6 +1114,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000"); DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000"); DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000"); +DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); +DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); + DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); DEFINE_mBool(enable_write_index_searcher_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 150e4e602fb76e..23e4a94c9eaca2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1152,6 +1152,9 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); DECLARE_mInt64(file_cache_background_monitor_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_batch); + +DECLARE_Int32(file_cache_downloader_thread_num_min); +DECLARE_Int32(file_cache_downloader_thread_num_max); // inverted index searcher cache // cache entry stay time after lookup DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s); diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index b9944e39989d2b..05c18e0b945ce3 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -43,8 +43,8 @@ namespace doris::io { FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") - .set_min_threads(4) - .set_max_threads(16) + .set_min_threads(config::file_cache_downloader_thread_num_min) + .set_max_threads(config::file_cache_downloader_thread_num_max) .build(&_workers); CHECK(st.ok()) << "failed to create FileCacheBlockDownloader"; } diff --git a/be/src/io/cache/block_file_cache_downloader.h b/be/src/io/cache/block_file_cache_downloader.h index 30827b69580553..c9a4689167363f 100644 --- a/be/src/io/cache/block_file_cache_downloader.h +++ b/be/src/io/cache/block_file_cache_downloader.h @@ -92,7 +92,7 @@ class FileCacheBlockDownloader { // tablet id -> inflight block num of tablet std::unordered_map _inflight_tablets; - static inline constexpr size_t _max_size {10240}; + static inline constexpr size_t _max_size {102400}; }; } // namespace doris::io diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c59e8adc3eb61c..39d25ea6cf9645 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3338,6 +3338,9 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000; // 1 seconds + @ConfField(mutable = true, masterOnly = true) + public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L; // 20GB + @ConfField(mutable = true, masterOnly = true) public static boolean enable_fetch_cluster_cache_hotspot = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 30cc76b2a6b304..00f9cb0960251a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -365,7 +365,7 @@ Long getFileCacheCapacity(String clusterName) throws RuntimeException { } private Map>> splitBatch(Map> beToWarmUpTablets) { - final Long maxSizePerBatch = 10737418240L; // 10G + final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch; Map>> beToTabletIdBatches = new HashMap<>(); for (Map.Entry> entry : beToWarmUpTablets.entrySet()) { List> batches = new ArrayList<>(); From d001b8b6d7e62138b90dad6e63b8d1919eea0fc0 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 17 Jun 2025 10:19:17 +0800 Subject: [PATCH 2/5] format Signed-off-by: zhengyu --- .../main/java/org/apache/doris/cloud/CacheHotspotManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 00f9cb0960251a..48771d36240693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -365,7 +365,7 @@ Long getFileCacheCapacity(String clusterName) throws RuntimeException { } private Map>> splitBatch(Map> beToWarmUpTablets) { - final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch; + final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch; Map>> beToTabletIdBatches = new HashMap<>(); for (Map.Entry> entry : beToWarmUpTablets.entrySet()) { List> batches = new ArrayList<>(); From a24235785d47057b3f77cc036b661b1def2e8812 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 17 Jun 2025 17:24:00 +0800 Subject: [PATCH 3/5] fix take a long time warning Signed-off-by: zhengyu --- be/src/cloud/cloud_warm_up_manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index a0b3f544de276e..2d450d2555c750 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -205,7 +205,7 @@ void CloudWarmUpManager::handle_jobs() { timespec time; time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; - if (!wait->timed_wait(time)) { + if (wait->timed_wait(time)) { LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); } { From 9202cb4c10c8da2ef7708c4f54e038e8d4434765 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 19 Jun 2025 16:28:18 +0800 Subject: [PATCH 4/5] response to the reviewer Signed-off-by: zhengyu --- be/src/cloud/cloud_warm_up_manager.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 2d450d2555c750..510c677f06f34c 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -68,9 +68,11 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, std::shared_ptr wait) { if (file_size < 0) { auto st = file_system->file_size(path, &file_size); - LOG(WARNING) << "get file size failed: " << path; - file_cache_warm_up_failed_task_num << 1; - return; + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "get file size failed: " << path; + file_cache_warm_up_failed_task_num << 1; + return; + } } const int64_t chunk_size = 10 * 1024 * 1024; // 10MB From 838c1ef6baba59be341143837e57841642755423 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 20 Jun 2025 15:08:00 +0800 Subject: [PATCH 5/5] minor fix Signed-off-by: zhengyu --- be/src/cloud/cloud_backend_service.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 1491cf83e0a0ad..de5c77208c5e89 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -108,7 +108,8 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, .tag("job_id", request.job_id) .tag("batch_id", request.batch_id) .tag("jobs size", request.job_metas.size()) - .tag("tablet num of first meta", request.job_metas[0].tablet_ids.size()); + .tag("tablet num of first meta", + request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size()); bool retry = false; st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry); if (!retry && st) {