diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index a5caf89e2074af..62f72f0bb76a69 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -179,6 +179,12 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str(); + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + // Record each tablet in manager + for (int64_t tablet_id : request.tablet_ids) { + manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port); + } + std::string host = request.host; auto dns_cache = ExecEnv::GetInstance()->dns_cache(); if (dns_cache == nullptr) { @@ -188,6 +194,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!status.ok()) { LOG(WARNING) << "failed to get ip from host " << request.host << ": " << status.to_string(); + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); return; } } @@ -199,6 +207,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); return; } brpc::Controller cntl; @@ -213,10 +223,20 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!cntl.Failed()) { g_file_cache_warm_up_cache_async_submitted_segment_num << brpc_response.file_cache_block_metas().size(); - _engine.file_cache_block_downloader().submit_download_task( - std::move(*brpc_response.mutable_file_cache_block_metas())); + auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas(); + if (!file_cache_block_metas.empty()) { + _engine.file_cache_block_downloader().submit_download_task( + std::move(file_cache_block_metas)); + LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets=" + << oss.str(); + } else { + LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr; + manager.remove_balanced_tablets(request.tablet_ids); + } } else { st = Status::RpcError("{} isn't connected", brpc_addr); + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr << ", error=" << cntl.ErrorText(); } diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 240ffe56c3cf09..94bb951b95d71a 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -19,6 +19,9 @@ #include +#include +#include + #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" @@ -27,12 +30,24 @@ #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" +#include "util/async_io.h" #include "util/debug_points.h" namespace doris { #include "common/compile_check_avoid_begin.h" #include "common/compile_check_begin.h" +bvar::Adder g_file_cache_get_by_peer_num("file_cache_get_by_peer_num"); +bvar::Adder g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num"); +bvar::Adder g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num"); +bvar::Adder g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num"); +bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency( + "file_cache_get_by_peer_server_latency"); +bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( + "file_cache_get_by_peer_read_cache_file_latency"); + CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) : PInternalService(exec_env), _engine(engine) {} @@ -154,6 +169,113 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( VLOG_DEBUG << "warm up get meta request=" << request->DebugString() << ", response=" << response->DebugString(); } + +void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller + [[maybe_unused]], + const PFetchPeerDataRequest* request, + PFetchPeerDataResponse* response, + google::protobuf::Closure* done) { + // TODO(dx): use async thread pool to handle the request, not AsyncIO + brpc::ClosureGuard closure_guard(done); + g_file_cache_get_by_peer_num << 1; + if (!config::enable_file_cache) { + LOG_WARNING("try to access file cache data, but file cache not enabled"); + return; + } + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + const auto type = request->type(); + const auto& path = request->path(); + response->mutable_status()->set_status_code(TStatusCode::OK); + if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) { + // Read specific range [file_offset, file_offset+file_size) across cached blocks + auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path); + for (auto& cb : datas) { + *(response->add_datas()) = std::move(cb); + } + } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) { + // Multiple specific blocks + auto hash = io::BlockFileCache::hash(path); + auto* cache = io::FileCacheFactory::instance()->get_by_path(hash); + if (cache == nullptr) { + g_file_cache_get_by_peer_failed_num << 1; + response->mutable_status()->add_error_msgs("can't get file cache instance"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + io::CacheContext ctx {}; + // ensure a valid stats pointer is provided to cache layer + io::ReadStatistics local_stats; + ctx.stats = &local_stats; + for (const auto& cb_req : request->cache_req()) { + size_t offset = static_cast(std::max(0, cb_req.block_offset())); + size_t size = static_cast(std::max(0, cb_req.block_size())); + auto holder = cache->get_or_set(hash, offset, size, ctx); + for (auto& fb : holder.file_blocks) { + auto state = fb->state(); + if (state != io::FileBlock::State::DOWNLOADED) { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed, state=" << state; + response->mutable_status()->add_error_msgs("read cache file error"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + g_file_cache_get_by_peer_blocks_num << 1; + doris::CacheBlockPB* out = response->add_datas(); + out->set_block_offset(static_cast(fb->offset())); + out->set_block_size(static_cast(fb->range().size())); + std::string data; + data.resize(fb->range().size()); + // Offload the file block read to a dedicated OS thread to avoid bthread IO + Status read_st = Status::OK(); + // due to file_reader.cpp:33] Check failed: bthread_self() == 0 + int64_t begin_read_file_ts = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + auto task = [&] { + // Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task. + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + Slice slice(data.data(), data.size()); + read_st = fb->read(slice, /*read_offset=*/0); + }; + AsyncIO::run_task(task, io::FileSystemType::LOCAL); + int64_t end_read_file_ts = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_read_cache_file_latency + << (end_read_file_ts - begin_read_file_ts); + if (read_st.ok()) { + out->set_data(std::move(data)); + } else { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed: " << read_st; + response->mutable_status()->add_error_msgs("read cache file error"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + } + } + } + DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", { + int st_us = dp->param("sleep", 1000); + LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us); + // sleep us + bthread_usleep(st_us); + }); + + int64_t end_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts); + g_file_cache_get_by_peer_success_num << 1; + + VLOG_DEBUG << "fetch cache request=" << request->DebugString() + << ", response=" << response->DebugString(); +} + #include "common/compile_check_end.h" bvar::Adder g_file_cache_event_driven_warm_up_submitted_segment_num( diff --git a/be/src/cloud/cloud_internal_service.h b/be/src/cloud/cloud_internal_service.h index 59d8739cbf46d6..db4916313fe596 100644 --- a/be/src/cloud/cloud_internal_service.h +++ b/be/src/cloud/cloud_internal_service.h @@ -48,6 +48,11 @@ class CloudInternalServiceImpl final : public PInternalService { const PRecycleCacheRequest* request, PRecycleCacheResponse* response, google::protobuf::Closure* done) override; + // Get file cached data about the path in file cache + void fetch_peer_data(google::protobuf::RpcController* controller, + const PFetchPeerDataRequest* request, PFetchPeerDataResponse* response, + google::protobuf::Closure* done) override; + private: CloudStorageEngine& _engine; }; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 28f4b60e1da71b..6eb88aee761f65 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -484,7 +484,7 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* tablet->build_tablet_report_info(&tablet_info); using namespace std::chrono; int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); - if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) { + if (now - g_tablet_report_inactive_duration_ms < tablet->last_access_time_ms) { // the tablet is still being accessed and used in recently, so not report it return; } diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 648318c039098d..f2b7588f1aea62 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -82,6 +82,7 @@ bvar::Adder g_file_cache_recycle_cache_requested_index_num( bvar::Status g_file_cache_warm_up_rowset_last_call_unix_ts( "file_cache_warm_up_rowset_last_call_unix_ts", 0); bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); +bvar::Adder g_balance_tablet_be_mapping_size("balance_tablet_be_mapping_size"); bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency( "file_cache_warm_up_rowset_wait_for_compaction_latency"); @@ -104,6 +105,11 @@ CloudWarmUpManager::~CloudWarmUpManager() { if (_download_thread.joinable()) { _download_thread.join(); } + + for (auto& shard : _balanced_tablets_shards) { + std::lock_guard lock(shard.mtx); + shard.tablets.clear(); + } } std::unordered_map snapshot_rs_metas(BaseTablet* tablet) { @@ -783,5 +789,83 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id, } } +// Balance warm up cache management methods implementation +void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const std::string& host, + int32_t brpc_port) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + JobMeta meta; + meta.be_ip = host; + meta.brpc_port = brpc_port; + shard.tablets.emplace(tablet_id, std::move(meta)); + g_balance_tablet_be_mapping_size << 1; + VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" << tablet_id + << ", host=" << host << ":" << brpc_port; +} + +std::optional> CloudWarmUpManager::get_balanced_tablet_info( + int64_t tablet_id) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + auto it = shard.tablets.find(tablet_id); + if (it == shard.tablets.end()) { + return std::nullopt; + } + return std::make_pair(it->second.be_ip, it->second.brpc_port); +} + +void CloudWarmUpManager::remove_balanced_tablet(int64_t tablet_id) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + auto it = shard.tablets.find(tablet_id); + if (it != shard.tablets.end()) { + shard.tablets.erase(it); + g_balance_tablet_be_mapping_size << -1; + VLOG_DEBUG << "Removed balanced warm up cache tablet by timer, tablet_id=" << tablet_id; + } +} + +void CloudWarmUpManager::remove_balanced_tablets(const std::vector& tablet_ids) { + // Group tablet_ids by shard to minimize lock contention + std::array, SHARD_COUNT> shard_groups; + for (int64_t tablet_id : tablet_ids) { + shard_groups[get_shard_index(tablet_id)].push_back(tablet_id); + } + + // Process each shard + for (size_t i = 0; i < SHARD_COUNT; ++i) { + if (shard_groups[i].empty()) continue; + + auto& shard = _balanced_tablets_shards[i]; + std::lock_guard lock(shard.mtx); + for (int64_t tablet_id : shard_groups[i]) { + auto it = shard.tablets.find(tablet_id); + if (it != shard.tablets.end()) { + shard.tablets.erase(it); + g_balance_tablet_be_mapping_size << -1; + VLOG_DEBUG << "Removed balanced warm up cache tablet: tablet_id=" << tablet_id; + } + } + } +} + +std::unordered_map> +CloudWarmUpManager::get_all_balanced_tablets() const { + std::unordered_map> result; + + // Lock all shards to get consistent snapshot + std::array, SHARD_COUNT> locks; + for (size_t i = 0; i < SHARD_COUNT; ++i) { + locks[i] = std::unique_lock(_balanced_tablets_shards[i].mtx); + } + + for (const auto& shard : _balanced_tablets_shards) { + for (const auto& [tablet_id, entry] : shard.tablets) { + result.emplace(tablet_id, std::make_pair(entry.be_ip, entry.brpc_port)); + } + } + return result; +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index dfa4f6e2be688a..8725dc069398e0 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -48,6 +48,10 @@ struct JobMeta { std::vector tablet_ids; }; +// manager for +// table warm up +// cluster warm up +// balance peer addr cache class CloudWarmUpManager { public: explicit CloudWarmUpManager(CloudStorageEngine& engine); @@ -85,6 +89,14 @@ class CloudWarmUpManager { void recycle_cache(int64_t tablet_id, const std::vector& rowsets); + // Balance warm up cache management methods + void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port); + std::optional> get_balanced_tablet_info(int64_t tablet_id); + void remove_balanced_tablet(int64_t tablet_id); + void remove_balanced_tablets(const std::vector& tablet_ids); + bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const; + std::unordered_map> get_all_balanced_tablets() const; + private: void handle_jobs(); @@ -120,6 +132,22 @@ class CloudWarmUpManager { std::unordered_map _tablet_replica_cache; std::unique_ptr _thread_pool; std::unique_ptr _thread_pool_token; + + // Sharded lock for better performance + static constexpr size_t SHARD_COUNT = 10240; + struct Shard { + mutable std::mutex mtx; + std::unordered_map tablets; + }; + std::array _balanced_tablets_shards; + + // Helper methods for shard operations + size_t get_shard_index(int64_t tablet_id) const { + return std::hash {}(tablet_id) % SHARD_COUNT; + } + Shard& get_shard(int64_t tablet_id) { + return _balanced_tablets_shards[get_shard_index(tablet_id)]; + } }; } // namespace doris diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 8d30113e0cf519..b7a504c4bac25a 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -133,5 +133,11 @@ DEFINE_mBool(enable_standby_passive_compaction, "true"); DEFINE_mDouble(standby_compaction_version_ratio, "0.8"); +DEFINE_mBool(enable_cache_read_from_peer, "false"); + +// Cache the expiration time of the peer address. +// This can be configured to be less than the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration. +// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration as the expiration time. +DEFINE_mInt64(cache_read_from_peer_expired_seconds, "-1"); #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 38092b34ff4244..7bba91ab21ada4 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -175,5 +175,9 @@ DECLARE_mBool(enable_standby_passive_compaction); DECLARE_mDouble(standby_compaction_version_ratio); +DECLARE_mBool(enable_cache_read_from_peer); + +DECLARE_mInt64(cache_read_from_peer_expired_seconds); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp index 1b69be1c573509..8a1c5a5fb79c1b 100644 --- a/be/src/http/action/file_cache_action.cpp +++ b/be/src/http/action/file_cache_action.cpp @@ -75,6 +75,11 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri json[RELEASED_ELEMENTS.data()] = released; *json_metrics = json.ToString(); } else if (operation == CLEAR) { + DBUG_EXECUTE_IF("FileCacheAction._handle_header.ignore_clear", { + LOG_WARNING("debug point FileCacheAction._handle_header.ignore_clear"); + st = Status::OK(); + return st; + }); const std::string& sync = req->param(SYNC.data()); const std::string& segment_path = req->param(VALUE.data()); if (segment_path.empty()) { diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index f5a2a287f83ba4..57d67b96e06a02 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -20,7 +20,9 @@ #include "io/cache/block_file_cache_downloader.h" +#include #include +#include #include #include #include @@ -30,6 +32,7 @@ #include #include "cloud/cloud_tablet_mgr.h" +#include "cloud/cloud_warm_up_manager.h" #include "common/config.h" #include "common/logging.h" #include "cpp/sync_point.h" @@ -169,6 +172,14 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +static void clean_up_expired_mappings(void* arg) { + // Reclaim ownership with unique_ptr for automatic memory management + std::unique_ptr tablet_id(static_cast(arg)); + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + manager.remove_balanced_tablet(*tablet_id); + VLOG_DEBUG << "Removed expired balanced warm up cache tablet: tablet_id=" << *tablet_id; +} + void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { @@ -215,8 +226,27 @@ void FileCacheBlockDownloader::download_file_cache_block( << "]"; } } + // Use std::make_unique to avoid raw pointer allocation + auto tablet_id_ptr = std::make_unique(tablet_id); + unsigned long expired_ms = g_tablet_report_inactive_duration_ms; + if (doris::config::cache_read_from_peer_expired_seconds > 0 && + doris::config::cache_read_from_peer_expired_seconds <= + g_tablet_report_inactive_duration_ms / 1000) { + expired_ms = doris::config::cache_read_from_peer_expired_seconds * 1000; + } + bthread_timer_t timer_id; + // ATTN: The timer callback will reclaim ownership of the tablet_id_ptr, so we need to release it after the timer is added. + if (const int rc = + bthread_timer_add(&timer_id, butil::milliseconds_from_now(expired_ms), + clean_up_expired_mappings, tablet_id_ptr.get()); + rc == 0) { + tablet_id_ptr.release(); + } else { + LOG(WARNING) << "Fail to add timer for clean up expired mappings for tablet_id=" + << tablet_id << " rc=" << rc; + } LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id - << "status=" << st.to_string(); + << " status=" << st.to_string() << " expired_ms=" << expired_ms; }; std::string path; @@ -313,9 +343,16 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met void FileCacheBlockDownloader::download_blocks(DownloadTask& task) { switch (task.task_message.index()) { - case 0: - download_file_cache_block(std::get<0>(task.task_message)); + case 0: { + bool should_balance_task = true; + DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task", + { should_balance_task = false; }); + if (should_balance_task) { + download_file_cache_block(std::get<0>(task.task_message)); + } + break; + } case 1: download_segment_file(std::get<1>(task.task_message)); break; diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index 598baa8e857e38..f99a9112d09dde 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -41,6 +41,7 @@ #include "io/fs/local_file_system.h" #include "runtime/exec_env.h" #include "service/backend_options.h" +#include "util/slice.h" #include "vec/core/block.h" namespace doris { @@ -119,6 +120,40 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, return Status::OK(); } +std::vector FileCacheFactory::get_cache_data_by_path(const std::string& path) { + auto cache_hash = BlockFileCache::hash(path); + return get_cache_data_by_path(cache_hash); +} + +std::vector FileCacheFactory::get_cache_data_by_path( + const UInt128Wrapper& hash) { + std::vector ret; + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(hash); + if (cache == nullptr) { + return ret; + } + auto blocks = cache->get_blocks_by_key(hash); + for (auto& [offset, fb] : blocks) { + doris::CacheBlockPB cb; + cb.set_block_offset(static_cast(offset)); + cb.set_block_size(static_cast(fb->range().size())); + // try to read data into bytes + std::string data; + data.resize(fb->range().size()); + Slice slice(data.data(), data.size()); + // read from beginning of this block + Status st = fb->read(slice, /*read_offset=*/0); + if (st.ok()) { + cb.set_data(data); + } else { + // On read failure, skip setting data but still report meta + VLOG_DEBUG << "read cache block failed: " << st; + } + ret.emplace_back(std::move(cb)); + } + return ret; +} + std::vector FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) { io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash); auto blocks = cache->get_blocks_by_key(hash); diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index 8b9f5ae3ccbf07..837feac7f68543 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -27,6 +27,7 @@ #include #include "common/status.h" +#include "gen_cpp/internal_service.pb.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_cache_common.h" namespace doris { @@ -65,6 +66,11 @@ class FileCacheFactory { std::vector get_cache_file_by_path(const UInt128Wrapper& hash); int64_t get_cache_file_size_by_path(const UInt128Wrapper& hash); + // Return cached blocks data for a given key hash + std::vector get_cache_data_by_path(const UInt128Wrapper& hash); + // Convenience overload: compute hash from path and return cached blocks data + std::vector get_cache_data_by_path(const std::string& path); + BlockFileCache* get_by_path(const UInt128Wrapper& hash); BlockFileCache* get_by_path(const std::string& cache_base_path); std::vector get_query_context_holders( diff --git a/be/src/io/cache/block_file_cache_profile.cpp b/be/src/io/cache/block_file_cache_profile.cpp index fe6414b78780d3..6a9676fbeee626 100644 --- a/be/src/io/cache/block_file_cache_profile.cpp +++ b/be/src/io/cache/block_file_cache_profile.cpp @@ -30,6 +30,7 @@ std::shared_ptr FileCacheMetrics::report() { std::lock_guard lock(_mtx); output_stats->num_io_bytes_read_from_cache += _statistics->num_io_bytes_read_from_cache; output_stats->num_io_bytes_read_from_remote += _statistics->num_io_bytes_read_from_remote; + output_stats->num_io_bytes_read_from_peer += _statistics->num_io_bytes_read_from_peer; return output_stats; } @@ -43,6 +44,7 @@ void FileCacheMetrics::update(FileCacheStatistics* input_stats) { } _statistics->num_io_bytes_read_from_cache += input_stats->bytes_read_from_local; _statistics->num_io_bytes_read_from_remote += input_stats->bytes_read_from_remote; + _statistics->num_io_bytes_read_from_peer += input_stats->bytes_read_from_peer; } void FileCacheMetrics::register_entity() { @@ -56,8 +58,11 @@ void FileCacheMetrics::update_metrics_callback() { stats->num_io_bytes_read_from_cache); DorisMetrics::instance()->num_io_bytes_read_from_remote->set_value( stats->num_io_bytes_read_from_remote); + DorisMetrics::instance()->num_io_bytes_read_from_peer->set_value( + stats->num_io_bytes_read_from_peer); DorisMetrics::instance()->num_io_bytes_read_total->set_value( - stats->num_io_bytes_read_from_cache + stats->num_io_bytes_read_from_remote); + stats->num_io_bytes_read_from_cache + stats->num_io_bytes_read_from_remote + + stats->num_io_bytes_read_from_peer); } FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { @@ -67,8 +72,11 @@ FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumLocalIOTotal", TUnit::UNIT, cache_profile, 1); num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT, cache_profile, 1); + num_peer_io_total = + ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumPeerIOTotal", TUnit::UNIT, cache_profile, 1); local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1); remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1); + peer_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "PeerIOUseTimer", cache_profile, 1); remote_wait_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "WaitOtherDownloaderTimer", cache_profile, 1); write_cache_io_timer = @@ -81,6 +89,8 @@ FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { TUnit::BYTES, cache_profile, 1); bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "BytesScannedFromRemote", TUnit::BYTES, cache_profile, 1); + bytes_scanned_from_peer = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "BytesScannedFromPeer", + TUnit::BYTES, cache_profile, 1); read_cache_file_directly_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "ReadCacheFileDirectlyTimer", cache_profile, 1); cache_get_or_set_timer = @@ -93,14 +103,20 @@ FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { profile, "InvertedIndexNumLocalIOTotal", TUnit::UNIT, cache_profile, 1); inverted_index_num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL( profile, "InvertedIndexNumRemoteIOTotal", TUnit::UNIT, cache_profile, 1); + inverted_index_num_peer_io_total = ADD_CHILD_COUNTER_WITH_LEVEL( + profile, "InvertedIndexNumPeerIOTotal", TUnit::UNIT, cache_profile, 1); inverted_index_bytes_scanned_from_cache = ADD_CHILD_COUNTER_WITH_LEVEL( profile, "InvertedIndexBytesScannedFromCache", TUnit::BYTES, cache_profile, 1); inverted_index_bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL( profile, "InvertedIndexBytesScannedFromRemote", TUnit::BYTES, cache_profile, 1); + inverted_index_bytes_scanned_from_peer = ADD_CHILD_COUNTER_WITH_LEVEL( + profile, "InvertedIndexBytesScannedFromPeer", TUnit::BYTES, cache_profile, 1); inverted_index_local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexLocalIOUseTimer", cache_profile, 1); inverted_index_remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexRemoteIOUseTimer", cache_profile, 1); + inverted_index_peer_io_timer = + ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexPeerIOUseTimer", cache_profile, 1); inverted_index_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexIOTimer", cache_profile, 1); } @@ -108,14 +124,17 @@ FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { void FileCacheProfileReporter::update(const FileCacheStatistics* statistics) const { COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total); COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total); + COUNTER_UPDATE(num_peer_io_total, statistics->num_peer_io_total); COUNTER_UPDATE(local_io_timer, statistics->local_io_timer); COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer); + COUNTER_UPDATE(peer_io_timer, statistics->peer_io_timer); COUNTER_UPDATE(remote_wait_timer, statistics->remote_wait_timer); COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer); COUNTER_UPDATE(bytes_write_into_cache, statistics->bytes_write_into_cache); COUNTER_UPDATE(num_skip_cache_io_total, statistics->num_skip_cache_io_total); COUNTER_UPDATE(bytes_scanned_from_cache, statistics->bytes_read_from_local); COUNTER_UPDATE(bytes_scanned_from_remote, statistics->bytes_read_from_remote); + COUNTER_UPDATE(bytes_scanned_from_peer, statistics->bytes_read_from_peer); COUNTER_UPDATE(read_cache_file_directly_timer, statistics->read_cache_file_directly_timer); COUNTER_UPDATE(cache_get_or_set_timer, statistics->cache_get_or_set_timer); COUNTER_UPDATE(lock_wait_timer, statistics->lock_wait_timer); @@ -126,12 +145,16 @@ void FileCacheProfileReporter::update(const FileCacheStatistics* statistics) con statistics->inverted_index_num_local_io_total); COUNTER_UPDATE(inverted_index_num_remote_io_total, statistics->inverted_index_num_remote_io_total); + COUNTER_UPDATE(inverted_index_num_peer_io_total, statistics->inverted_index_num_peer_io_total); COUNTER_UPDATE(inverted_index_bytes_scanned_from_cache, statistics->inverted_index_bytes_read_from_local); COUNTER_UPDATE(inverted_index_bytes_scanned_from_remote, statistics->inverted_index_bytes_read_from_remote); + COUNTER_UPDATE(inverted_index_bytes_scanned_from_peer, + statistics->inverted_index_bytes_read_from_peer); COUNTER_UPDATE(inverted_index_local_io_timer, statistics->inverted_index_local_io_timer); COUNTER_UPDATE(inverted_index_remote_io_timer, statistics->inverted_index_remote_io_timer); + COUNTER_UPDATE(inverted_index_peer_io_timer, statistics->inverted_index_peer_io_timer); COUNTER_UPDATE(inverted_index_io_timer, statistics->inverted_index_io_timer); } diff --git a/be/src/io/cache/block_file_cache_profile.h b/be/src/io/cache/block_file_cache_profile.h index 903f45a8663f9c..2feb3f2692380c 100644 --- a/be/src/io/cache/block_file_cache_profile.h +++ b/be/src/io/cache/block_file_cache_profile.h @@ -37,6 +37,7 @@ namespace io { struct AtomicStatistics { std::atomic num_io_bytes_read_from_cache = 0; std::atomic num_io_bytes_read_from_remote = 0; + std::atomic num_io_bytes_read_from_peer = 0; }; class FileCacheMetrics { public: @@ -66,10 +67,13 @@ class FileCacheMetrics { struct FileCacheProfileReporter { RuntimeProfile::Counter* num_local_io_total = nullptr; RuntimeProfile::Counter* num_remote_io_total = nullptr; + RuntimeProfile::Counter* num_peer_io_total = nullptr; RuntimeProfile::Counter* local_io_timer = nullptr; RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr; RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr; + RuntimeProfile::Counter* bytes_scanned_from_peer = nullptr; RuntimeProfile::Counter* remote_io_timer = nullptr; + RuntimeProfile::Counter* peer_io_timer = nullptr; RuntimeProfile::Counter* remote_wait_timer = nullptr; RuntimeProfile::Counter* write_cache_io_timer = nullptr; RuntimeProfile::Counter* bytes_write_into_cache = nullptr; @@ -82,10 +86,13 @@ struct FileCacheProfileReporter { RuntimeProfile::Counter* inverted_index_num_local_io_total = nullptr; RuntimeProfile::Counter* inverted_index_num_remote_io_total = nullptr; + RuntimeProfile::Counter* inverted_index_num_peer_io_total = nullptr; RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr; RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr; + RuntimeProfile::Counter* inverted_index_bytes_scanned_from_peer = nullptr; RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr; RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr; + RuntimeProfile::Counter* inverted_index_peer_io_timer = nullptr; RuntimeProfile::Counter* inverted_index_io_timer = nullptr; FileCacheProfileReporter(RuntimeProfile* profile); diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index d8e960d913e0e7..59ed51a6f6b6f2 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -17,15 +17,24 @@ #include "io/cache/cached_remote_file_reader.h" +#include #include #include +#include #include -#include #include +#include +#include +#include +#include #include +#include +#include +#include #include +#include "cloud/cloud_warm_up_manager.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "cpp/sync_point.h" @@ -33,16 +42,26 @@ #include "io/cache/block_file_cache_factory.h" #include "io/cache/block_file_cache_profile.h" #include "io/cache/file_block.h" +#include "io/cache/peer_file_cache_reader.h" #include "io/fs/file_reader.h" #include "io/fs/local_file_system.h" #include "io/io_common.h" +#include "olap/storage_policy.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" +#include "service/backend_options.h" #include "util/bit_util.h" +#include "util/brpc_client_cache.h" // BrpcClientCache +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" namespace doris::io { -bvar::Adder remote_read_counter("cached_remote_reader_remote_read"); +bvar::Adder s3_read_counter("cached_remote_reader_s3_read"); +bvar::Adder peer_read_counter("cached_remote_reader_peer_read"); bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); bvar::Adder g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); bvar::Adder g_skip_local_cache_io_sum_bytes( @@ -63,6 +82,8 @@ bvar::Window> g_read_cache_indirect_bytes_1min_window( bvar::Window> g_read_cache_indirect_total_bytes_1min_window( "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes, 60); +bvar::Adder g_failed_get_peer_addr_counter( + "cached_remote_reader_failed_get_peer_addr_counter"); CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts) @@ -131,6 +152,126 @@ std::pair CachedRemoteFileReader::s_align_size(size_t offset, si return std::make_pair(align_left, align_size); } +namespace { +std::optional extract_tablet_id(const std::string& file_path) { + return StorageResource::parse_tablet_id_from_path(file_path); +} + +// Get peer connection info from tablet_id +std::pair get_peer_connection_info(const std::string& file_path) { + std::string host = ""; + int port = 0; + + // Try to get tablet_id from actual path and lookup tablet info + if (auto tablet_id = extract_tablet_id(file_path)) { + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) { + host = tablet_info->first; + port = tablet_info->second; + } else { + LOG_WARNING("get peer connection info not found") + .tag("tablet_id", *tablet_id) + .tag("file_path", file_path); + } + } else { + LOG_WARNING("parse tablet id from path failed") + .tag("tablet_id", "null") + .tag("file_path", file_path); + } + + DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { + host = dp->param("host", "127.0.0.1"); + port = dp->param("port", 9060); + LOG_WARNING("debug point PeerFileCacheReader::_fetch_from_peer_cache_blocks") + .tag("host", host) + .tag("port", port); + }); + + return {host, port}; +} + +// Execute peer read with fallback to S3 +Status execute_peer_read(const std::vector& empty_blocks, size_t empty_start, + size_t& size, std::unique_ptr& buffer, + const std::string& file_path, bool is_doris_table, ReadStatistics& stats, + const IOContext* io_ctx) { + auto [host, port] = get_peer_connection_info(file_path); + VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port + << ", file_path=" << file_path; + + if (host.empty() || port == 0) { + g_failed_get_peer_addr_counter << 1; + LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is empty" + << ", host=" << host << ", port=" << port + << ", file_path=" << file_path; + return Status::InternalError("host or port is empty"); + } + SCOPED_RAW_TIMER(&stats.peer_read_timer); + peer_read_counter << 1; + PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); + auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size, + io_ctx); + if (!st.ok()) { + LOG_WARNING("PeerFileCacheReader read from peer failed") + .tag("host", host) + .tag("port", port) + .tag("error", st.msg()); + } + stats.from_peer_cache = true; + return st; +} + +// Execute S3 read +Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr& buffer, + ReadStatistics& stats, const IOContext* io_ctx, + FileReaderSPtr remote_file_reader) { + s3_read_counter << 1; + SCOPED_RAW_TIMER(&stats.remote_read_timer); + stats.from_peer_cache = false; + return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); +} + +} // anonymous namespace + +Status CachedRemoteFileReader::_execute_remote_read(const std::vector& empty_blocks, + size_t empty_start, size_t& size, + std::unique_ptr& buffer, + ReadStatistics& stats, + const IOContext* io_ctx) { + DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", { + // Determine read type from debug point or default to S3 + std::string read_type = "s3"; + read_type = dp->param("type", "s3"); + LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type") + .tag("path", path().native()) + .tag("off", empty_start) + .tag("size", size) + .tag("type", read_type); + // Execute appropriate read strategy + if (read_type == "s3") { + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } else { + return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), + _is_doris_table, stats, io_ctx); + } + }); + + if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) { + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } else { + // first try peer read, if peer failed, fallback to S3 + // peer timeout is 5 seconds + // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first + auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), + _is_doris_table, stats, io_ctx); + if (!st.ok()) { + // Fallback to S3 + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } + return st; + } +} + Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { size_t already_read = 0; @@ -148,6 +289,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* *bytes_read = 0; return Status::OK(); } + ReadStatistics stats; stats.bytes_read += bytes_req; MonotonicStopWatch read_at_sw; @@ -274,12 +416,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* empty_end = empty_blocks.back()->range().right; size_t size = empty_end - empty_start + 1; std::unique_ptr buffer(new char[size]); - { - remote_read_counter << 1; - SCOPED_RAW_TIMER(&stats.remote_read_timer); - RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), - &size, io_ctx)); - } + + // Determine read type and execute remote read + RETURN_IF_ERROR( + _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); + for (auto& block : empty_blocks) { if (block->state() == FileBlock::State::SKIP_CACHE) { continue; @@ -371,7 +512,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* << st.msg() << ", block state=" << block_state; size_t bytes_read {0}; stats.hit_cache = false; - remote_read_counter << 1; + stats.from_peer_cache = false; + s3_read_counter << 1; SCOPED_RAW_TIMER(&stats.remote_read_timer); RETURN_IF_ERROR(_remote_file_reader->read_at( current_offset, Slice(result.data + (current_offset - offset), read_size), @@ -400,10 +542,16 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->num_local_io_total++; statis->bytes_read_from_local += read_stats.bytes_read; } else { - statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; + if (read_stats.from_peer_cache) { + statis->num_peer_io_total++; + statis->bytes_read_from_peer += read_stats.bytes_read; + statis->peer_io_timer += read_stats.peer_read_timer; + } else { + statis->num_remote_io_total++; + statis->bytes_read_from_remote += read_stats.bytes_read; + statis->remote_io_timer += read_stats.remote_read_timer; + } } - statis->remote_io_timer += read_stats.remote_read_timer; statis->remote_wait_timer += read_stats.remote_wait_timer; statis->local_io_timer += read_stats.local_read_timer; statis->num_skip_cache_io_total += read_stats.skip_cache; @@ -421,11 +569,17 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->inverted_index_num_local_io_total++; statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; } else { - statis->inverted_index_num_remote_io_total++; - statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; + if (read_stats.from_peer_cache) { + statis->inverted_index_num_peer_io_total++; + statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; + statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; + } else { + statis->inverted_index_num_remote_io_total++; + statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; + statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; + } } statis->inverted_index_local_io_timer += read_stats.local_read_timer; - statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; } g_skip_cache_sum << read_stats.skip_cache; diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 94e8a5807ba273..939471b62ea41d 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "common/status.h" #include "io/cache/block_file_cache.h" @@ -60,15 +61,21 @@ class CachedRemoteFileReader final : public FileReader { private: void _insert_file_reader(FileBlockSPtr file_block); + + // Execute remote read (S3 or peer). + Status _execute_remote_read(const std::vector& empty_blocks, size_t empty_start, + size_t& size, std::unique_ptr& buffer, + ReadStatistics& stats, const IOContext* io_ctx); + + void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state, + bool is_inverted_index) const; + bool _is_doris_table; FileReaderSPtr _remote_file_reader; UInt128Wrapper _cache_hash; BlockFileCache* _cache; std::shared_mutex _mtx; std::map _cache_file_readers; - - void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state, - bool is_inverted_index) const; }; } // namespace doris::io diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 4b54d317f12e7c..d99f426adb8c1e 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -63,10 +63,12 @@ struct UInt128Wrapper { struct ReadStatistics { bool hit_cache = true; + bool from_peer_cache = false; bool skip_cache = false; int64_t bytes_read = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; + int64_t peer_read_timer = 0; int64_t remote_wait_timer = 0; // wait for other downloader int64_t local_read_timer = 0; int64_t local_write_timer = 0; diff --git a/be/src/io/cache/peer_file_cache_reader.cpp b/be/src/io/cache/peer_file_cache_reader.cpp new file mode 100644 index 00000000000000..c034cdce110324 --- /dev/null +++ b/be/src/io/cache/peer_file_cache_reader.cpp @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "io/cache/peer_file_cache_reader.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "runtime/exec_env.h" +#include "runtime/thread_context.h" +#include "util/brpc_client_cache.h" +#include "util/bvar_helper.h" +#include "util/debug_points.h" +#include "util/defer_op.h" +#include "util/doris_metrics.h" +#include "util/network_util.h" +#include "util/runtime_profile.h" + +namespace doris::io { +// read from peer + +bvar::Adder peer_cache_reader_failed_counter("peer_cache_reader", "failed_counter"); +bvar::Adder peer_cache_reader_succ_counter("peer_cache_reader", "succ_counter"); +bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader", "bytes_per_read"); // also QPS +bvar::Adder peer_cache_reader_total("peer_cache_reader", "total_num"); +bvar::Adder peer_cache_being_read("peer_cache_reader", "file_being_read"); +bvar::Adder peer_cache_reader_read_counter("peer_cache_reader", "read_at"); +bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader", "peer_latency"); +bvar::PerSecond> peer_get_request_qps("peer_cache_reader", "peer_get_request", + &peer_cache_reader_read_counter); +bvar::Adder peer_bytes_read_total("peer_cache_reader", "bytes_read"); +bvar::PerSecond> peer_read_througthput("peer_cache_reader", + "peer_read_throughput", + &peer_bytes_read_total); + +PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool is_doris_table, + std::string host, int port) + : _path(file_path), _is_doris_table(is_doris_table), _host(host), _port(port) { + peer_cache_reader_total << 1; + peer_cache_being_read << 1; +} + +PeerFileCacheReader::~PeerFileCacheReader() { + peer_cache_being_read << -1; +} + +Status PeerFileCacheReader::fetch_blocks(const std::vector& blocks, size_t off, + Slice s, size_t* bytes_read, const IOContext* ctx) { + VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off + << " bytes_read=" << *bytes_read; + *bytes_read = 0; + if (blocks.empty()) { + return Status::OK(); + } + if (!_is_doris_table) { + return Status::NotSupported("peer cache fetch only supports doris table segments"); + } + + PFetchPeerDataRequest req; + req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK); + req.set_path(_path.filename().native()); + for (const auto& blk : blocks) { + auto* cb = req.add_cache_req(); + cb->set_block_offset(static_cast(blk->range().left)); + cb->set_block_size(static_cast(blk->range().size())); + } + + std::string realhost = _host; + int port = _port; + + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(realhost)) { + Status status = dns_cache->get(_host, &realhost); + if (!status.ok()) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "failed to get ip from host " << _host << ": " << status.to_string(); + return Status::InternalError("failed to get ip from host {}", _host); + } + } + std::string brpc_addr = get_host_port(realhost, port); + Status st = Status::OK(); + std::shared_ptr brpc_stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache( + brpc_addr); + if (!brpc_stub) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "failed to get brpc stub " << brpc_addr; + st = Status::RpcError("Address {} is wrong", brpc_addr); + return st; + } + LIMIT_REMOTE_SCAN_IO(bytes_read); + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + Defer defer_latency {[&]() { + int64_t end_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + peer_cache_reader_latency << (end_ts - begin_ts); + }}; + + brpc::Controller cntl; + cntl.set_timeout_ms(5000); + PFetchPeerDataResponse resp; + peer_cache_reader_read_counter << 1; + brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr); + if (cntl.Failed()) { + return Status::RpcError(cntl.ErrorText()); + } + if (resp.has_status()) { + Status st2 = Status::create(resp.status()); + if (!st2.ok()) return st2; + } + + size_t filled = 0; + for (const auto& data : resp.datas()) { + if (data.data().empty()) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "peer cache read empty data" << data.block_offset(); + return Status::InternalError("peer cache read empty data"); + } + int64_t block_off = data.block_offset(); + size_t rel = block_off > static_cast(off) + ? static_cast(block_off - static_cast(off)) + : 0; + size_t can_copy = std::min(s.size - rel, static_cast(data.data().size())); + VLOG_DEBUG << "peer cache read data=" << data.block_offset() + << " size=" << data.data().size() << " off=" << rel << " can_copy=" << can_copy; + std::memcpy(s.data + rel, data.data().data(), can_copy); + filled += can_copy; + } + VLOG_DEBUG << "peer cache read filled=" << filled; + peer_bytes_read_total << filled; + *bytes_read = filled; + peer_bytes_per_read << filled; + if (filled != s.size) { + peer_cache_reader_failed_counter << 1; + return Status::InternalError("peer cache read incomplete: need={}, got={}", s.size, filled); + } + peer_cache_reader_succ_counter << 1; + return Status::OK(); +} + +} // namespace doris::io diff --git a/be/src/io/cache/peer_file_cache_reader.h b/be/src/io/cache/peer_file_cache_reader.h new file mode 100644 index 00000000000000..f028bc2cd919b6 --- /dev/null +++ b/be/src/io/cache/peer_file_cache_reader.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/status.h" +#include "io/cache/file_block.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/path.h" +#include "io/fs/s3_file_system.h" +#include "util/slice.h" + +namespace doris { +class RuntimeProfile; + +namespace io { +struct IOContext; + +class PeerFileCacheReader final { +public: + /** + * Construct a peer file cache reader bound to a specific file and peer endpoint. + * + * Params: + * - file_path: Path of the target file whose cache blocks will be fetched from a peer. + * - is_doris_table: Whether the target file is a Doris table segment; only true is supported. + * - host: Peer hostname or IP address to fetch from. + * - port: Peer BRPC service port. + */ + PeerFileCacheReader(const io::Path& file_path, bool is_doris_table, std::string host, int port); + ~PeerFileCacheReader(); + /** + * Fetch data blocks from a peer and write them into the provided buffer. + * + * Behavior: + * - Supports only Doris table segment files (is_doris_table=true); otherwise returns NotSupported. + * - Builds a BRPC request to invoke peer fetch_peer_data using the given blocks. + * - Copies returned block data into the contiguous buffer Slice s, using 'off' as the base offset. + * - Succeeds only if exactly s.size bytes are written; otherwise returns an Incomplete error. + * + * Params: + * - blocks: List of file blocks to fetch (global file offsets, inclusive ranges). + * - off: Base file offset corresponding to the start of Slice s. + * - s: Destination buffer; must be large enough to hold all requested block bytes. + * - n: Output number of bytes successfully written. + * - ctx: IO context (kept for interface symmetry). + * + * Returns: + * - OK: Successfully wrote exactly s.size bytes into the buffer. + * - NotSupported: The file is not a Doris table segment. + */ + Status fetch_blocks(const std::vector& blocks, size_t off, Slice s, + size_t* bytes_read, const IOContext* ctx); + +private: + io::Path _path; + bool _is_doris_table {false}; + std::string _host = "127.0.0.1"; + int _port = 9060; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index 79efa500c0677f..e6d8527e831906 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -57,6 +57,8 @@ struct FileReaderOptions { int64_t file_size = -1; // Use modification time to determine whether the file is changed int64_t mtime = 0; + // Used to query the location of the file cache + int64_t tablet_id = -1; static const FileReaderOptions DEFAULT; }; diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index eede868468f663..746f4dfb4faeff 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -57,6 +57,7 @@ bvar::PerSecond> s3_read_througthput("s3_file_reader", "s3 // record successfull request, and s3_get_request_qps will record all request. bvar::PerSecond> s3_get_request_qps("s3_file_reader", "s3_get_request", &s3_file_reader_read_counter); +bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency"); Result S3FileReader::create(std::shared_ptr client, std::string bucket, std::string key, int64_t file_size, @@ -114,6 +115,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea size_t bytes_req = result.size; char* to = result.data; bytes_req = std::min(bytes_req, _file_size - offset); + VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req + << " req=" << result.size << " file size=" << _file_size; if (UNLIKELY(bytes_req == 0)) { *bytes_read = 0; return Status::OK(); @@ -129,15 +132,23 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); LIMIT_REMOTE_SCAN_IO(bytes_read); - DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", { auto sleep_time = dp->param("sleep", 3); - LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s", sleep_time) + LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time) .tag("bucket", _bucket) .tag("key", _key); - std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); + std::this_thread::sleep_for(std::chrono::microseconds(sleep_time)); }); + Defer defer_latency {[&]() { + int64_t end_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + s3_file_reader_latency << (end_ts - begin_ts); + }}; int total_sleep_time = 0; while (retry_count <= max_retries) { diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 4ce57b4954ebce..311fa1b01ded08 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -45,10 +45,13 @@ struct FileReaderStats { struct FileCacheStatistics { int64_t num_local_io_total = 0; int64_t num_remote_io_total = 0; + int64_t num_peer_io_total = 0; int64_t local_io_timer = 0; int64_t bytes_read_from_local = 0; int64_t bytes_read_from_remote = 0; + int64_t bytes_read_from_peer = 0; int64_t remote_io_timer = 0; + int64_t peer_io_timer = 0; int64_t remote_wait_timer = 0; int64_t write_cache_io_timer = 0; int64_t bytes_write_into_cache = 0; @@ -61,10 +64,13 @@ struct FileCacheStatistics { int64_t inverted_index_num_local_io_total = 0; int64_t inverted_index_num_remote_io_total = 0; + int64_t inverted_index_num_peer_io_total = 0; int64_t inverted_index_bytes_read_from_local = 0; int64_t inverted_index_bytes_read_from_remote = 0; + int64_t inverted_index_bytes_read_from_peer = 0; int64_t inverted_index_local_io_timer = 0; int64_t inverted_index_remote_io_timer = 0; + int64_t inverted_index_peer_io_timer = 0; int64_t inverted_index_io_timer = 0; }; diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 3032e0b7cbc056..3765ac24220314 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -198,6 +198,7 @@ Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, .is_doris_table = true, .cache_base_path = "", .file_size = _rowset_meta->segment_file_size(static_cast(seg_id)), + .tablet_id = _rowset_meta->tablet_id(), }; auto s = segment_v2::Segment::open( diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp index ac03f17c7a981f..bf66bd064ae3c6 100644 --- a/be/src/olap/storage_policy.cpp +++ b/be/src/olap/storage_policy.cpp @@ -21,7 +21,9 @@ #include #include +#include #include +#include #include #include "gen_cpp/cloud.pb.h" @@ -198,6 +200,90 @@ std::string StorageResource::remote_segment_path(const RowsetMeta& rowset, int64 } } +// TODO(dx) +// fix this, it is a tricky function. Pass the upper layer's tablet ID to the io layer instead of using this tricky method +// Tricky, It is used to parse tablet_id from remote segment path, and it is used in tablet manager to parse tablet_id from remote segment path. +// Static function to parse tablet_id from remote segment path +std::optional StorageResource::parse_tablet_id_from_path(const std::string& path) { + // Expected path formats: + // support both .dat and .idx file extensions + // support formate see ut. storage_resource_test:StorageResourceTest.ParseTabletIdFromPath + + if (path.empty()) { + return std::nullopt; + } + + // Find the position of "data/" in the path + std::string_view path_view = path; + std::string_view data_prefix = DATA_PREFIX; + size_t data_pos = path_view.find(data_prefix); + if (data_pos == std::string_view::npos) { + return std::nullopt; + } + + // Extract the part after "data/" + path_view = path_view.substr(data_pos + data_prefix.length() + 1); + + // Check if path ends with .dat or .idx + if (!path_view.ends_with(".dat") && !path_view.ends_with(".idx")) { + return std::nullopt; + } + + // Count slashes in the remaining path + size_t slash_count = 0; + for (char c : path_view) { + if (c == '/') { + slash_count++; + } + } + + // Split path by '/' + std::vector parts; + size_t start = 0; + size_t pos = 0; + while ((pos = path_view.find('/', start)) != std::string_view::npos) { + if (pos > start) { + parts.push_back(path_view.substr(start, pos - start)); + } + start = pos + 1; + } + if (start < path_view.length()) { + parts.push_back(path_view.substr(start)); + } + + if (parts.empty()) { + return std::nullopt; + } + + // Determine path version based on slash count and extract tablet_id + // Version 0: {tablet_id}/{rowset_id}_{seg_id}.dat (1 slash) + // Version 1: {shard}/{tablet_id}/{rowset_id}/{seg_id}.dat (3 slashes) + + if (slash_count == 1) { + // Version 0 format: parts[0] should be tablet_id + if (parts.size() >= 1) { + try { + int64_t tablet_id = std::stoll(std::string(parts[0])); + return tablet_id; + } catch (const std::exception&) { + // Not a valid number, return nullopt at last + } + } + } else if (slash_count == 3) { + // Version 1 format: parts[1] should be tablet_id (parts[0] is shard) + if (parts.size() >= 2) { + try { + int64_t tablet_id = std::stoll(std::string(parts[1])); + return tablet_id; + } catch (const std::exception&) { + // Not a valid number, return nullopt at last + } + } + } + + return std::nullopt; +} + std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id, int64_t index_id, std::string_view index_path_suffix) const { diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h index 3d0700633effe7..b3d8058eb6dd3c 100644 --- a/be/src/olap/storage_policy.h +++ b/be/src/olap/storage_policy.h @@ -85,6 +85,9 @@ struct StorageResource { std::string cooldown_tablet_meta_path(int64_t tablet_id, int64_t replica_id, int64_t cooldown_term) const; + + // Static function to parse tablet_id from remote segment path + static std::optional parse_tablet_id_from_path(const std::string& path); }; // return nullptr if not found diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index ab69085e542497..b92c1380f43fa7 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -232,6 +232,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM) DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote, MetricUnit::OPERATIONS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_peer, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count, MetricUnit::OPERATIONS); @@ -403,6 +404,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_cache); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_remote); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_peer); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, udf_close_bthread_count); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index adabf1b5b3ee6b..4baadb1a173130 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -247,6 +247,7 @@ class DorisMetrics { IntCounter* num_io_bytes_read_total = nullptr; IntCounter* num_io_bytes_read_from_cache = nullptr; IntCounter* num_io_bytes_read_from_remote = nullptr; + IntCounter* num_io_bytes_read_from_peer = nullptr; IntCounter* udf_close_bthread_count = nullptr; diff --git a/be/test/olap/storage_resource_test.cpp b/be/test/olap/storage_resource_test.cpp index 2db0f59644bffb..62123705e4d84d 100644 --- a/be/test/olap/storage_resource_test.cpp +++ b/be/test/olap/storage_resource_test.cpp @@ -81,4 +81,113 @@ TEST(StorageResourceTest, RemotePath) { ASSERT_DEATH(StorageResource(res.value(), storage_vault_pb.path_format()), "unknown"); } +TEST(StorageResourceTest, ParseTabletIdFromPath) { + // Test Version 0 format: data/{tablet_id}/{rowset_id}_{seg_id}.dat + // see function StorageResource::remote_segment_path + // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id); + EXPECT_EQ( + StorageResource::parse_tablet_id_from_path( + "prefix_xxx/data/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be_5.dat"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("//data/12345/rowset_001_0.dat"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowset_abc_10.dat"), 999999); + + // Test Version 0 format with .idx files (v1 format) + // see function StorageResource::remote_idx_v1_path + // fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id, suffix); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "//data/10005/0200000000001cc2224124562e7_6_6666_suffix.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "bucket_xxx/data/12345/rowsetid_1_666_suffix.idx"), + 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetid_10_8888_suffix.idx"), + 999999); + + // Test Version 0 format with .idx files (v2 format) + // see function StorageResource::remote_idx_v2_path + // fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/10005/0200000000001cc2224124562e7_5.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/12345/rowset001_0.idx"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetabc_10.idx"), 999999); + + // Test Version 1 format: data/{shard}/{tablet_id}/{rowset_id}/{seg_id}.dat + // see function StorageResource::remote_segment_path + // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_xxxx/data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/" + "5.dat"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/0/12345/rowset_001/0.dat"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("s3:///data/999/999999/rowset_abc/10.dat"), + 999999); + + // Test Version 1 format with .idx files (v1 format) + // see function StorageResource::remote_idx_v1_path + // fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id, suffix); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3:///data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/" + "5_6666_suffix.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_bucket/data/0/12345/rowsetid/1_666_suffix.idx"), + 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "data/999/999999/rowsetid/10_8888_suffix.idx"), + 999999); + + // Test Version 1 format with .idx files (v2 format) + // see function StorageResource::remote_idx_v2_path + // fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/611/10005/" + "0200000000001cc2224124562e7dfd4834d031b13c0210be/5.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/12345/rowset001/0.idx"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999/999999/rowsetabc/10.idx"), + 999999); + + // Test edge cases + // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("prefix_bucket/data/0/rowset001_0.dat"), + 0); + // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/0/rowset001/0.dat"), 0); + + // Test invalid cases + EXPECT_EQ(StorageResource::parse_tablet_id_from_path(""), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("invalid_path"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/abc/rowset_001_0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/0/abc/rowset_001/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0.txt"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0"), std::nullopt); + + // Test paths with different slash counts (should return nullopt) + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001/extra/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/10005/rowset_001/extra/0.idx"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_bucket/data/10005/rowset_001/extra/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005.dat"), std::nullopt); + + // Test paths without data prefix + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("10005/rowset_001_0.dat"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("0/12345/rowset_001/0.dat"), std::nullopt); + + // Test paths with leading slash after data prefix + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//10005/rowset_001_0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//0/12345/rowset_001/0.dat"), + std::nullopt); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3cf51e31613054..8b0e351f3063f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -267,7 +267,8 @@ private HeartbeatResponse pingOnce() { if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); - copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds); + long reportInterval = Config.rehash_tablet_after_be_dead_seconds * 1000L; + copiedMasterInfo.setTabletReportInactiveDurationMs(reportInterval); TCloudClusterInfo clusterInfo = new TCloudClusterInfo(); clusterInfo.setIsStandby(backend.isInStandbyCluster()); copiedMasterInfo.setCloudClusterInfo(clusterInfo); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1ddfbcf2502442..f800e924e2ae0f 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1081,6 +1081,40 @@ message PGetTabletRowsetsResponse { optional DeleteBitmapPB delete_bitmap = 3; } +message CacheBlockReqest { + // PEER_FILE_CACHE_BLOCK + optional int64 block_offset = 1; + optional int64 block_size = 2; +} + +message PFetchPeerDataRequest { + enum Type { + PEER_FILE_RANGE = 1; + PEER_FILE_CACHE_BLOCK = 2; + } + optional Type type = 1; + // obj path, let peer calc hash, and download file cache + // PEER_FILE_RANGE and PEER_FILE_CACHE_BLOCK use + optional string path = 2; + + // PEER_FILE_RANGE + optional int64 file_offset = 3; + optional int64 file_size = 4; + // PEER_FILE_CACHE_BLOCK + repeated CacheBlockReqest cache_req = 5; +} + +message CacheBlockPB { + optional int64 block_offset = 1; + optional int64 block_size = 2; + optional bytes data = 3; +} + +message PFetchPeerDataResponse { + optional PStatus status = 1; + repeated CacheBlockPB datas = 2; +} + service PBackendService { // If #fragments of a query is < 3, use exec_plan_fragment directly. // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start @@ -1137,5 +1171,6 @@ service PBackendService { rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns (PCommitRefreshDictionaryResponse); rpc abort_refresh_dictionary(PAbortRefreshDictionaryRequest) returns (PAbortRefreshDictionaryResponse); rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns (PGetTabletRowsetsResponse); + rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); }; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy index 2e24ae1f7bc167..824ac4098e16b3 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy @@ -65,7 +65,8 @@ class ProfileAction implements SuiteAction { } def httpCli = new HttpCliAction(context) - httpCli.endpoint(context.config.feHttpAddress) + def addr = context.getFeHttpAddress() + httpCli.endpoint("${addr.hostString}:${addr.port}") httpCli.uri("/rest/v1/query_profile") httpCli.op("get") httpCli.printResponse(false) @@ -89,7 +90,7 @@ class ProfileAction implements SuiteAction { def profileId = profileItem["Profile ID"].toString() def profileCli = new HttpCliAction(context) - profileCli.endpoint(context.config.feHttpAddress) + profileCli.endpoint("${addr.hostString}:${addr.port}") profileCli.uri("/rest/v1/query_profile/${profileId}") profileCli.op("get") profileCli.printResponse(false) diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy index 7a18f22bb31cf5..56eace67c8eede 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy @@ -29,19 +29,32 @@ suite('test_balance_warm_up', 'docker') { 'sys_log_verbose_modules=org', 'heartbeat_interval_second=1', 'rehash_tablet_after_be_dead_seconds=3600', - 'enable_cloud_warm_up_for_rebalance=true' + 'cloud_warm_up_for_rebalance_type=async_warmup' ] options.beConfigs += [ 'report_tablet_interval_seconds=1', 'schedule_sync_tablets_interval_s=18000', 'disable_auto_compaction=true', - 'sys_log_verbose_modules=*' + 'sys_log_verbose_modules=*', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' ] options.setFeNum(1) options.setBeNum(1) options.cloudMode = true options.enableDebugPoints() + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + def testCase = { table -> def ms = cluster.getAllMetaservices().get(0) def msHttpPort = ms.host + ":" + ms.httpPort @@ -74,6 +87,22 @@ suite('test_balance_warm_up', 'docker') { insert into $table values (44, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); """ + // more tablets accessed. for test metrics `balance_tablet_be_mapping_size` + sql """CREATE TABLE more_tablets_warm_up_test_tbl ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into more_tablets_warm_up_test_tbl values (1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4'), (5, 'value5'), (6, 'value6'), (7, 'value7'), (8, 'value8'), (9, 'value9'), (10, 'value10'), (11, 'value11'), (12, 'value12'), (13, 'value13'), (14, 'value14'), (15, 'value15'), (16, 'value16'), (17, 'value17'), (18, 'value18'), (19, 'value19'), (20, 'value20'); + """ + // before add be def beforeGetFromFe = getTabletAndBeHostFromFe(table) def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) @@ -128,6 +157,8 @@ suite('test_balance_warm_up', 'docker') { } } + sql """select count(*) from more_tablets_warm_up_test_tbl""" + // from be1 -> be2, warm up this tablet // after add be def afterGetFromFe = getTabletAndBeHostFromFe(table) @@ -179,6 +210,11 @@ suite('test_balance_warm_up', 'docker') { "Expected cache file pattern ${hashFile} not found in BE ${newAddBe.Host}'s file_cache directory. " + "Available subdirs: ${subDirs}") } + + sleep(105 * 1000) + // test expired be tablet cache info be removed + // after cache_read_from_peer_expired_seconds = 100s + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "balance_tablet_be_mapping_size")) } docker(options) { diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy new file mode 100644 index 00000000000000..e2209931a93156 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_use_peer_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=30', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task") + setFeConfig('enable_cloud_multi_replica', false) + + sleep(5 * 1000) + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, new rs after warm up task + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + def afterMerged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 4 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // after 30s task timeout, check tablet in new be + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskTimeoutResult) + afterWarmUpTaskTimeoutResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("after cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("after cache dir version 3 {}", afterCacheDirVersion3) + sleep(5 * 1000) + // version 4 + def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("after cache dir version 4 {}", afterCacheDirVersion4) + + def afterMergedCacheDir = [afterCacheDirVersion2, afterCacheDirVersion3, afterCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + // check new be not have version 2,3,4 cache file + newAddBeCacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + + // The query triggers reading the file cache from the peer + profile("test_balance_warm_up_use_peer_cache_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_balance_warm_up_use_peer_cache_profile */ select * from $table""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("NumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + + subDirs.clear() + collectDirs(dataPath) + logger.info("after query, BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + // peer read cache, so it should have version 2,3,4 cache file + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read")) + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read")) + } + + docker(options) { + testCase("test_balance_warm_up_use_peer_cache_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy new file mode 100644 index 00000000000000..a232a14dea1927 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=30', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + 'cumulative_compaction_min_deltas=5', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """set enable_file_cache=true""" + sql """CREATE TABLE $table ( + `id` BIGINT, + `deleted` TINYINT, + `type` String, + `author` String, + `timestamp` DateTimeV2, + `comment` String, + `dead` TINYINT, + `parent` BIGINT, + `poll` BIGINT, + `children` Array, + `url` String, + `score` INT, + `title` String, + `parts` Array, + `descendants` INT, + INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment' + ) + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ("replication_num" = "1"); + """ + sql """ + insert into $table values (344083, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (44, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + sql """ + insert into $table values (344089, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (449, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (469, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + sql """ + insert into $table values (344084, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (849, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (869, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + // trigger compaction to generate some cache files + trigger_and_wait_compaction(table, "cumulative") + sleep(5 * 1000) + + def beforeCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort, table, 7) + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task") + setFeConfig('enable_cloud_multi_replica', false) + + + def afterMergedVersion7CacheDir = [beforeCacheDirVersion7] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 7 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMergedVersion7CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // after 30s task timeout, check tablet in new be + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskTimeoutResult) + afterWarmUpTaskTimeoutResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 7 + def afterCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort, table, 7) + logger.info("after cache dir version 7 {}", afterCacheDirVersion7) + + def afterMergedCacheDir = [afterCacheDirVersion7] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + // calc file cache hash on new added BE, but these cache files should not exist on new BE yet + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMergedVersion7CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + // check new be not have version 7 cache file + newAddBeCacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + + // The query triggers reading the file cache from the peer + profile("test_balance_warm_up_with_compaction_use_peer_cache_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_balance_warm_up_with_compaction_use_peer_cache_profile */ SELECT count() FROM $table WHERE comment MATCH_ALL 'Welcome'""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- InvertedIndexNumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("InvertedIndexNumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + subDirs.clear() + collectDirs(dataPath) + logger.info("after query, BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + // peer read cache, so it should have version 7 cache file + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read")) + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read")) + } + + docker(options) { + testCase("test_balance_warm_up_with_compaction_use_peer_cache_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy new file mode 100644 index 00000000000000..7ccf118b65f9d6 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.doris.regression.util.NodeType + +suite('test_read_from_peer', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'workload_group_check_interval_ms=1' + ] + options.beConfigs += [ + 'file_cache_each_block_size=131072', + // 'sys_log_verbose_modules=*', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def tableName = "test_read_from_table" + + def clusterBe = { clusterName -> + def bes = sql_return_maparray "show backends" + def clusterBes = bes.findAll { be -> be.Tag.contains(clusterName) } + logger.info("cluster {}, bes {}", clusterName, clusterBes) + clusterBes[0] + } + + def testCase = { String clusterName, String runType -> + def startTime = System.currentTimeMillis() + GetDebugPoint().enableDebugPointForAllBEs("CachedRemoteFileReader.read_at_impl.change_type", [type: runType]) + + try { + sql """ + use @$clusterName + """ + + def be = clusterBe(clusterName) + def haveCacheBe = clusterBe("compute_cluster") + + switch (runType) { + case "peer": + GetDebugPoint().enableDebugPoint(be.Host, be.HttpPort as int, NodeType.BE, "PeerFileCacheReader::_fetch_from_peer_cache_blocks", + [host: haveCacheBe.Host, port: haveCacheBe.BrpcPort]) + break + case "s3": + break + default: + throw new IllegalArgumentException("Invalid type: $runType. Expected: peer, s3") + } + + // Execute the query and measure time + def queryStartTime = System.currentTimeMillis() + def ret = sql """ + select count(*) from $tableName + """ + logger.info("select ret={}", ret) + def queryTime = System.currentTimeMillis() - queryStartTime + logger.info("Test completed - Type:{}, Cluster: {}, Query execution time: {}ms", runType, clusterName, queryTime) + } catch (Exception e) { + def totalTime = System.currentTimeMillis() - startTime + logger.info("Test failed after {}ms - Type: {}, Cluster: {} Error: {}", totalTime, runType, clusterName, e.message) + throw e + } + } + + docker(options) { + // 添加一个新的cluster, 只从s3上读 + cluster.addBackend(1, "readS3cluster") + + // 添加一个新的cluster, 只从peer上读 + cluster.addBackend(1, "readPeercluster") + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + ss_sold_date_sk bigint, + ss_sold_time_sk bigint, + ss_item_sk bigint, + ss_customer_sk bigint, + ss_cdemo_sk bigint, + ss_hdemo_sk bigint, + ss_addr_sk bigint, + ss_store_sk bigint, + ss_promo_sk bigint, + ss_ticket_number bigint, + ss_quantity integer, + ss_wholesale_cost decimal(7,2), + ss_list_price decimal(7,2), + ss_sales_price decimal(7,2), + ss_ext_discount_amt decimal(7,2), + ss_ext_sales_price decimal(7,2), + ss_ext_wholesale_cost decimal(7,2), + ss_ext_list_price decimal(7,2), + ss_ext_tax decimal(7,2), + ss_coupon_amt decimal(7,2), + ss_net_paid decimal(7,2), + ss_net_paid_inc_tax decimal(7,2), + ss_net_profit decimal(7,2) + ) + DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk) + DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + + sql """ + use @compute_cluster + """ + + // in compute_cluster be-1, cache all data in file cache + def txnId = -1; + // version 2 + streamLoad { + table "${tableName}" + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + + file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz""" + // file """store_sales.dat.gz""" + + time 10000 // limit inflight 10s + setFeAddr cluster.getAllFrontends().get(0).host, cluster.getAllFrontends().get(0).httpPort + + check { res, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${res}".toString()) + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + def ret = sql """ + select count(*) from $tableName + """ + logger.info("ret after load, ret {}", ret) + + testCase("compute_cluster", "s3") + testCase("readS3cluster", "s3") + testCase("readPeercluster", "peer") + } +}