diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 94bb951b95d71a..2584ce8146b534 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -170,110 +170,160 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( << ", response=" << response->DebugString(); } +namespace { +// Helper functions for fetch_peer_data + +Status handle_peer_file_range_request(const std::string& path, PFetchPeerDataResponse* response) { + // Read specific range [file_offset, file_offset+file_size) across cached blocks + auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path); + for (auto& cb : datas) { + *(response->add_datas()) = std::move(cb); + } + return Status::OK(); +} + +void set_error_response(PFetchPeerDataResponse* response, const std::string& error_msg) { + response->mutable_status()->add_error_msgs(error_msg); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); +} + +Status read_file_block(const std::shared_ptr& file_block, size_t file_size, + doris::CacheBlockPB* output) { + std::string data; + // ATTN: calculate the rightmost boundary value of the block, due to inaccurate current block meta information. + // see CachedRemoteFileReader::read_at_impl for more details. + // Ensure file_size >= file_block->offset() to avoid underflow + if (file_size < file_block->offset()) { + LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset(" + << file_block->offset() << ")"; + return Status::InternalError("file_size less than block offset"); + } + size_t read_size = std::min(static_cast(file_size - file_block->offset()), + file_block->range().size()); + data.resize(read_size); + + auto begin_read_file_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + Slice slice(data.data(), data.size()); + Status read_st = file_block->read(slice, /*read_offset=*/0); + + auto end_read_file_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts); + + if (read_st.ok()) { + output->set_block_offset(static_cast(file_block->offset())); + output->set_block_size(static_cast(read_size)); + output->set_data(std::move(data)); + return Status::OK(); + } else { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed: " << read_st; + return read_st; + } +} + +Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request, + PFetchPeerDataResponse* response) { + const auto& path = request->path(); + auto hash = io::BlockFileCache::hash(path); + auto* cache = io::FileCacheFactory::instance()->get_by_path(hash); + if (cache == nullptr) { + g_file_cache_get_by_peer_failed_num << 1; + set_error_response(response, "can't get file cache instance"); + return Status::InternalError("can't get file cache instance"); + } + + io::CacheContext ctx {}; + io::ReadStatistics local_stats; + ctx.stats = &local_stats; + + for (const auto& cb_req : request->cache_req()) { + size_t offset = static_cast(std::max(0, cb_req.block_offset())); + size_t size = static_cast(std::max(0, cb_req.block_size())); + auto holder = cache->get_or_set(hash, offset, size, ctx); + + for (auto& fb : holder.file_blocks) { + if (fb->state() != io::FileBlock::State::DOWNLOADED) { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed, state=" << fb->state(); + set_error_response(response, "read cache file error"); + return Status::InternalError("cache block not downloaded"); + } + + g_file_cache_get_by_peer_blocks_num << 1; + doris::CacheBlockPB* out = response->add_datas(); + Status read_status = read_file_block(fb, request->file_size(), out); + if (!read_status.ok()) { + set_error_response(response, "read cache file error"); + return read_status; + } + } + } + + return Status::OK(); +} +} // namespace + void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller [[maybe_unused]], const PFetchPeerDataRequest* request, PFetchPeerDataResponse* response, google::protobuf::Closure* done) { - // TODO(dx): use async thread pool to handle the request, not AsyncIO - brpc::ClosureGuard closure_guard(done); - g_file_cache_get_by_peer_num << 1; - if (!config::enable_file_cache) { - LOG_WARNING("try to access file cache data, but file cache not enabled"); - return; - } - int64_t begin_ts = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - const auto type = request->type(); - const auto& path = request->path(); - response->mutable_status()->set_status_code(TStatusCode::OK); - if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) { - // Read specific range [file_offset, file_offset+file_size) across cached blocks - auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path); - for (auto& cb : datas) { - *(response->add_datas()) = std::move(cb); - } - } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) { - // Multiple specific blocks - auto hash = io::BlockFileCache::hash(path); - auto* cache = io::FileCacheFactory::instance()->get_by_path(hash); - if (cache == nullptr) { - g_file_cache_get_by_peer_failed_num << 1; - response->mutable_status()->add_error_msgs("can't get file cache instance"); - response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + g_file_cache_get_by_peer_num << 1; + + if (!config::enable_file_cache) { + LOG_WARNING("try to access file cache data, but file cache not enabled"); return; } - io::CacheContext ctx {}; - // ensure a valid stats pointer is provided to cache layer - io::ReadStatistics local_stats; - ctx.stats = &local_stats; - for (const auto& cb_req : request->cache_req()) { - size_t offset = static_cast(std::max(0, cb_req.block_offset())); - size_t size = static_cast(std::max(0, cb_req.block_size())); - auto holder = cache->get_or_set(hash, offset, size, ctx); - for (auto& fb : holder.file_blocks) { - auto state = fb->state(); - if (state != io::FileBlock::State::DOWNLOADED) { - g_file_cache_get_by_peer_failed_num << 1; - LOG(WARNING) << "read cache block failed, state=" << state; - response->mutable_status()->add_error_msgs("read cache file error"); - response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); - return; - } - g_file_cache_get_by_peer_blocks_num << 1; - doris::CacheBlockPB* out = response->add_datas(); - out->set_block_offset(static_cast(fb->offset())); - out->set_block_size(static_cast(fb->range().size())); - std::string data; - data.resize(fb->range().size()); - // Offload the file block read to a dedicated OS thread to avoid bthread IO - Status read_st = Status::OK(); - // due to file_reader.cpp:33] Check failed: bthread_self() == 0 - int64_t begin_read_file_ts = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - auto task = [&] { - // Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task. - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); - Slice slice(data.data(), data.size()); - read_st = fb->read(slice, /*read_offset=*/0); - }; - AsyncIO::run_task(task, io::FileSystemType::LOCAL); - int64_t end_read_file_ts = - std::chrono::duration_cast( + + auto begin_ts = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); - g_file_cache_get_by_peer_read_cache_file_latency - << (end_read_file_ts - begin_read_file_ts); - if (read_st.ok()) { - out->set_data(std::move(data)); - } else { - g_file_cache_get_by_peer_failed_num << 1; - LOG(WARNING) << "read cache block failed: " << read_st; - response->mutable_status()->add_error_msgs("read cache file error"); - response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); - return; - } - } + + const auto type = request->type(); + const auto& path = request->path(); + response->mutable_status()->set_status_code(TStatusCode::OK); + + Status status = Status::OK(); + if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) { + status = handle_peer_file_range_request(path, response); + } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) { + status = handle_peer_file_cache_block_request(request, response); } - } - DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", { - int st_us = dp->param("sleep", 1000); - LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us); - // sleep us - bthread_usleep(st_us); - }); - int64_t end_ts = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts); - g_file_cache_get_by_peer_success_num << 1; + if (!status.ok()) { + LOG(WARNING) << "fetch peer data failed: " << status.to_string(); + set_error_response(response, status.to_string()); + } - VLOG_DEBUG << "fetch cache request=" << request->DebugString() - << ", response=" << response->DebugString(); + DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", { + int st_us = dp->param("sleep", 1000); + LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us); + bthread_usleep(st_us); + }); + + auto end_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts); + g_file_cache_get_by_peer_success_num << 1; + + VLOG_DEBUG << "fetch cache request=" << request->DebugString() + << ", response=" << response->DebugString(); + }); + + if (!ret) { + brpc::ClosureGuard closure_guard(done); + LOG(WARNING) << "fail to offer fetch peer data request to the work pool, pool=" + << _heavy_work_pool.get_info(); + } } #include "common/compile_check_end.h" diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 851ff386e4dc16..ba28d9f347970b 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block( .is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX, .expiration_time = meta.expiration_time(), .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, + .is_warmup = true, }, .download_done = std::move(download_done), }; diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 59ed51a6f6b6f2..240515e94e1973 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -191,10 +191,12 @@ std::pair get_peer_connection_info(const std::string& file_pat } // Execute peer read with fallback to S3 +// file_size is the size of the file +// used to calculate the rightmost boundary value of the block, due to inaccurate current block meta information. Status execute_peer_read(const std::vector& empty_blocks, size_t empty_start, size_t& size, std::unique_ptr& buffer, - const std::string& file_path, bool is_doris_table, ReadStatistics& stats, - const IOContext* io_ctx) { + const std::string& file_path, size_t file_size, bool is_doris_table, + ReadStatistics& stats, const IOContext* io_ctx) { auto [host, port] = get_peer_connection_info(file_path); VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port << ", file_path=" << file_path; @@ -210,7 +212,7 @@ Status execute_peer_read(const std::vector& empty_blocks, size_t peer_read_counter << 1; PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size, - io_ctx); + file_size, io_ctx); if (!st.ok()) { LOG_WARNING("PeerFileCacheReader read from peer failed") .tag("host", host) @@ -252,19 +254,24 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vectorsize(), _is_doris_table, stats, io_ctx); } }); - if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) { + if (!doris::config::is_cloud_mode() || !_is_doris_table || io_ctx->is_warmup || + !doris::config::enable_cache_read_from_peer) { return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); } else { // first try peer read, if peer failed, fallback to S3 // peer timeout is 5 seconds // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first + // ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size + size_t original_size = size; auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), - _is_doris_table, stats, io_ctx); + this->size(), _is_doris_table, stats, io_ctx); if (!st.ok()) { + // Restore original size for S3 fallback, as peer read may have modified it + size = original_size; // Fallback to S3 return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); } diff --git a/be/src/io/cache/peer_file_cache_reader.cpp b/be/src/io/cache/peer_file_cache_reader.cpp index c034cdce110324..adb867f5c314ad 100644 --- a/be/src/io/cache/peer_file_cache_reader.cpp +++ b/be/src/io/cache/peer_file_cache_reader.cpp @@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() { } Status PeerFileCacheReader::fetch_blocks(const std::vector& blocks, size_t off, - Slice s, size_t* bytes_read, const IOContext* ctx) { + Slice s, size_t* bytes_read, size_t file_size, + const IOContext* ctx) { VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off << " bytes_read=" << *bytes_read; - *bytes_read = 0; if (blocks.empty()) { + *bytes_read = 0; return Status::OK(); } if (!_is_doris_table) { @@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector& block PFetchPeerDataRequest req; req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK); req.set_path(_path.filename().native()); + req.set_file_size(static_cast(file_size)); for (const auto& blk : blocks) { auto* cb = req.add_cache_req(); cb->set_block_offset(static_cast(blk->range().left)); @@ -154,13 +156,13 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector& block } VLOG_DEBUG << "peer cache read filled=" << filled; peer_bytes_read_total << filled; - *bytes_read = filled; peer_bytes_per_read << filled; if (filled != s.size) { peer_cache_reader_failed_counter << 1; return Status::InternalError("peer cache read incomplete: need={}, got={}", s.size, filled); } peer_cache_reader_succ_counter << 1; + *bytes_read = filled; return Status::OK(); } diff --git a/be/src/io/cache/peer_file_cache_reader.h b/be/src/io/cache/peer_file_cache_reader.h index f028bc2cd919b6..49fe56336a21d7 100644 --- a/be/src/io/cache/peer_file_cache_reader.h +++ b/be/src/io/cache/peer_file_cache_reader.h @@ -61,7 +61,8 @@ class PeerFileCacheReader final { * - blocks: List of file blocks to fetch (global file offsets, inclusive ranges). * - off: Base file offset corresponding to the start of Slice s. * - s: Destination buffer; must be large enough to hold all requested block bytes. - * - n: Output number of bytes successfully written. + * - bytes_read: Output number of bytes read. + * - file_size: Size of the file to be read. * - ctx: IO context (kept for interface symmetry). * * Returns: @@ -69,7 +70,7 @@ class PeerFileCacheReader final { * - NotSupported: The file is not a Doris table segment. */ Status fetch_blocks(const std::vector& blocks, size_t off, Slice s, - size_t* bytes_read, const IOContext* ctx); + size_t* bytes_read, size_t file_size, const IOContext* ctx); private: io::Path _path; diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy index 56eace67c8eede..37eb309d866648 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy @@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') { // test expired be tablet cache info be removed // after cache_read_from_peer_expired_seconds = 100s assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "balance_tablet_be_mapping_size")) + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read")) + assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read")) } docker(options) {