diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 63f76632d79695..de5c77208c5e89 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -107,7 +107,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, .tag("request_type", "SET_BATCH") .tag("job_id", request.job_id) .tag("batch_id", request.batch_id) - .tag("jobs size", request.job_metas.size()); + .tag("jobs size", request.job_metas.size()) + .tag("tablet num of first meta", + request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size()); bool retry = false; st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry); if (!retry && st) { diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 1c046eeac8fe7c..673d721d84e8f1 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -286,7 +286,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }); auto download_idx_file = [&](const io::Path& idx_path) { @@ -299,7 +303,11 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, }, - .download_done {}, + .download_done {[](Status st) { + if (!st) { + LOG_WARNING("add rowset warm up error ").error(st); + } + }}, }; _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); }; diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index d8bce097465dde..510c677f06f34c 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -17,7 +17,8 @@ #include "cloud/cloud_warm_up_manager.h" -#include +#include +#include #include #include @@ -34,6 +35,8 @@ namespace doris { +bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); + CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) { _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this); } @@ -59,6 +62,52 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, + io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait) { + if (file_size < 0) { + auto st = file_system->file_size(path, &file_size); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << "get file size failed: " << path; + file_cache_warm_up_failed_task_num << 1; + return; + } + } + + const int64_t chunk_size = 10 * 1024 * 1024; // 10MB + int64_t offset = 0; + int64_t remaining_size = file_size; + + while (remaining_size > 0) { + int64_t current_chunk_size = std::min(chunk_size, remaining_size); + wait->add_count(); + + _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { + .path = path, + .file_size = file_size, + .offset = offset, + .download_size = current_chunk_size, + .file_system = file_system, + .ctx = + { + .expiration_time = expiration_time, + .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + }, + .download_done = + [wait](Status st) { + if (!st) { + LOG_WARNING("Warm up error ").error(st); + } + wait->signal(); + }, + }); + + offset += current_chunk_size; + remaining_size -= current_chunk_size; + } +} + void CloudWarmUpManager::handle_jobs() { #ifndef BE_TEST constexpr int WAIT_TIME_SECONDS = 600; @@ -77,6 +126,10 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up job is null"); continue; } + + std::shared_ptr wait = + std::make_shared(0); + for (int64_t tablet_id : cur_job->tablet_ids) { if (_cur_job_id == 0) { // The job is canceled break; @@ -92,8 +145,7 @@ void CloudWarmUpManager::handle_jobs() { LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st); continue; } - std::shared_ptr wait = - std::make_shared(0); + auto tablet_meta = tablet->tablet_meta(); auto rs_metas = snapshot_rs_metas(tablet.get()); for (auto& [_, rs] : rs_metas) { @@ -112,71 +164,51 @@ void CloudWarmUpManager::handle_jobs() { expiration_time = 0; } - wait->add_count(); - // clang-format off - _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { - .path = storage_resource.value()->remote_segment_path(*rs, seg_id), - .file_size = rs->segment_file_size(seg_id), - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }); - - auto download_idx_file = [&](const io::Path& idx_path) { - io::DownloadFileMeta meta { - .path = idx_path, - .file_size = -1, - .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, - }, - .download_done = - [wait](Status st) { - if (!st) { - LOG_WARNING("Warm up error ").error(st); - } - wait->signal(); - }, - }; - // clang-format on - _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); - }; + // 1st. download segment files + submit_download_tasks( + storage_resource.value()->remote_segment_path(*rs, seg_id), + rs->segment_file_size(seg_id), storage_resource.value()->fs, + expiration_time, wait); + + // 2nd. download inverted index files + int64_t file_size = -1; auto schema_ptr = rs->tablet_schema(); auto idx_version = schema_ptr->get_inverted_index_storage_format(); + const auto& idx_file_info = rs->inverted_index_file_info(seg_id); if (idx_version == InvertedIndexStorageFormatPB::V1) { for (const auto& index : schema_ptr->inverted_indexes()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v1_path( *rs, seg_id, index->index_id(), index->get_index_suffix()); - download_idx_file(idx_path); + if (idx_file_info.index_info_size() > 0) { + for (const auto& idx_info : idx_file_info.index_info()) { + if (index->index_id() == idx_info.index_id() && + index->get_index_suffix() == idx_info.index_suffix()) { + file_size = idx_info.index_file_size(); + break; + } + } + } + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } else { if (schema_ptr->has_inverted_index()) { - wait->add_count(); auto idx_path = storage_resource.value()->remote_idx_v2_path(*rs, seg_id); - download_idx_file(idx_path); + file_size = idx_file_info.has_index_size() ? idx_file_info.index_size() + : -1; + submit_download_tasks(idx_path, file_size, storage_resource.value()->fs, + expiration_time, wait); } } } } - timespec time; - time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; - if (!wait->timed_wait(time)) { - LOG_WARNING("Warm up tablet {} take a long time", tablet_meta->tablet_id()); - } + } + + timespec time; + time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS; + if (wait->timed_wait(time)) { + LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size()); } { std::unique_lock lock(_mtx); diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 219dedc58065a6..356d7284f6f3ee 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -69,7 +71,9 @@ class CloudWarmUpManager { private: void handle_jobs(); - + void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, + int64_t expiration_time, + std::shared_ptr wait); std::mutex _mtx; std::condition_variable _cond; int64_t _cur_job_id {0}; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index dd3be9f152016a..2badc8ca97f4b5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1114,6 +1114,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000"); DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000"); DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000"); +DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); +DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); + DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); DEFINE_mBool(enable_write_index_searcher_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 150e4e602fb76e..23e4a94c9eaca2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1152,6 +1152,9 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); DECLARE_mInt64(file_cache_background_monitor_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_batch); + +DECLARE_Int32(file_cache_downloader_thread_num_min); +DECLARE_Int32(file_cache_downloader_thread_num_max); // inverted index searcher cache // cache entry stay time after lookup DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s); diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index b9944e39989d2b..05c18e0b945ce3 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -43,8 +43,8 @@ namespace doris::io { FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) { _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this); auto st = ThreadPoolBuilder("FileCacheBlockDownloader") - .set_min_threads(4) - .set_max_threads(16) + .set_min_threads(config::file_cache_downloader_thread_num_min) + .set_max_threads(config::file_cache_downloader_thread_num_max) .build(&_workers); CHECK(st.ok()) << "failed to create FileCacheBlockDownloader"; } diff --git a/be/src/io/cache/block_file_cache_downloader.h b/be/src/io/cache/block_file_cache_downloader.h index 30827b69580553..c9a4689167363f 100644 --- a/be/src/io/cache/block_file_cache_downloader.h +++ b/be/src/io/cache/block_file_cache_downloader.h @@ -92,7 +92,7 @@ class FileCacheBlockDownloader { // tablet id -> inflight block num of tablet std::unordered_map _inflight_tablets; - static inline constexpr size_t _max_size {10240}; + static inline constexpr size_t _max_size {102400}; }; } // namespace doris::io diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c59e8adc3eb61c..39d25ea6cf9645 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3338,6 +3338,9 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000; // 1 seconds + @ConfField(mutable = true, masterOnly = true) + public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L; // 20GB + @ConfField(mutable = true, masterOnly = true) public static boolean enable_fetch_cluster_cache_hotspot = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 30cc76b2a6b304..48771d36240693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -365,7 +365,7 @@ Long getFileCacheCapacity(String clusterName) throws RuntimeException { } private Map>> splitBatch(Map> beToWarmUpTablets) { - final Long maxSizePerBatch = 10737418240L; // 10G + final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch; Map>> beToTabletIdBatches = new HashMap<>(); for (Map.Entry> entry : beToWarmUpTablets.entrySet()) { List> batches = new ArrayList<>();