Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 142 additions & 92 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,110 +170,160 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
<< ", response=" << response->DebugString();
}

namespace {
// Helper functions for fetch_peer_data

Status handle_peer_file_range_request(const std::string& path, PFetchPeerDataResponse* response) {
// Read specific range [file_offset, file_offset+file_size) across cached blocks
auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path);
for (auto& cb : datas) {
*(response->add_datas()) = std::move(cb);
}
return Status::OK();
}

void set_error_response(PFetchPeerDataResponse* response, const std::string& error_msg) {
response->mutable_status()->add_error_msgs(error_msg);
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
}

Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block, size_t file_size,
doris::CacheBlockPB* output) {
std::string data;
// ATTN: calculate the rightmost boundary value of the block, due to inaccurate current block meta information.
// see CachedRemoteFileReader::read_at_impl for more details.
// Ensure file_size >= file_block->offset() to avoid underflow
if (file_size < file_block->offset()) {
LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
<< file_block->offset() << ")";
return Status::InternalError<false>("file_size less than block offset");
}
size_t read_size = std::min(static_cast<size_t>(file_size - file_block->offset()),
file_block->range().size());
data.resize(read_size);

auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
Slice slice(data.data(), data.size());
Status read_st = file_block->read(slice, /*read_offset=*/0);

auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts);

if (read_st.ok()) {
output->set_block_offset(static_cast<int64_t>(file_block->offset()));
output->set_block_size(static_cast<int64_t>(read_size));
output->set_data(std::move(data));
return Status::OK();
} else {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed: " << read_st;
return read_st;
}
}

Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request,
PFetchPeerDataResponse* response) {
const auto& path = request->path();
auto hash = io::BlockFileCache::hash(path);
auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
if (cache == nullptr) {
g_file_cache_get_by_peer_failed_num << 1;
set_error_response(response, "can't get file cache instance");
return Status::InternalError<false>("can't get file cache instance");
}

io::CacheContext ctx {};
io::ReadStatistics local_stats;
ctx.stats = &local_stats;

for (const auto& cb_req : request->cache_req()) {
size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset()));
size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size()));
auto holder = cache->get_or_set(hash, offset, size, ctx);

for (auto& fb : holder.file_blocks) {
if (fb->state() != io::FileBlock::State::DOWNLOADED) {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed, state=" << fb->state();
set_error_response(response, "read cache file error");
return Status::InternalError<false>("cache block not downloaded");
}

g_file_cache_get_by_peer_blocks_num << 1;
doris::CacheBlockPB* out = response->add_datas();
Status read_status = read_file_block(fb, request->file_size(), out);
if (!read_status.ok()) {
set_error_response(response, "read cache file error");
return read_status;
}
}
}

return Status::OK();
}
} // namespace

void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller
[[maybe_unused]],
const PFetchPeerDataRequest* request,
PFetchPeerDataResponse* response,
google::protobuf::Closure* done) {
// TODO(dx): use async thread pool to handle the request, not AsyncIO
brpc::ClosureGuard closure_guard(done);
g_file_cache_get_by_peer_num << 1;
if (!config::enable_file_cache) {
LOG_WARNING("try to access file cache data, but file cache not enabled");
return;
}
int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
const auto type = request->type();
const auto& path = request->path();
response->mutable_status()->set_status_code(TStatusCode::OK);
if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
// Read specific range [file_offset, file_offset+file_size) across cached blocks
auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path);
for (auto& cb : datas) {
*(response->add_datas()) = std::move(cb);
}
} else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
// Multiple specific blocks
auto hash = io::BlockFileCache::hash(path);
auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
if (cache == nullptr) {
g_file_cache_get_by_peer_failed_num << 1;
response->mutable_status()->add_error_msgs("can't get file cache instance");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
bool ret = _heavy_work_pool.try_offer([request, response, done]() {
brpc::ClosureGuard closure_guard(done);
g_file_cache_get_by_peer_num << 1;

if (!config::enable_file_cache) {
LOG_WARNING("try to access file cache data, but file cache not enabled");
return;
}
io::CacheContext ctx {};
// ensure a valid stats pointer is provided to cache layer
io::ReadStatistics local_stats;
ctx.stats = &local_stats;
for (const auto& cb_req : request->cache_req()) {
size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset()));
size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size()));
auto holder = cache->get_or_set(hash, offset, size, ctx);
for (auto& fb : holder.file_blocks) {
auto state = fb->state();
if (state != io::FileBlock::State::DOWNLOADED) {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed, state=" << state;
response->mutable_status()->add_error_msgs("read cache file error");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
return;
}
g_file_cache_get_by_peer_blocks_num << 1;
doris::CacheBlockPB* out = response->add_datas();
out->set_block_offset(static_cast<int64_t>(fb->offset()));
out->set_block_size(static_cast<int64_t>(fb->range().size()));
std::string data;
data.resize(fb->range().size());
// Offload the file block read to a dedicated OS thread to avoid bthread IO
Status read_st = Status::OK();
// due to file_reader.cpp:33] Check failed: bthread_self() == 0
int64_t begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto task = [&] {
// Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task.
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
Slice slice(data.data(), data.size());
read_st = fb->read(slice, /*read_offset=*/0);
};
AsyncIO::run_task(task, io::FileSystemType::LOCAL);
int64_t end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(

auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_read_cache_file_latency
<< (end_read_file_ts - begin_read_file_ts);
if (read_st.ok()) {
out->set_data(std::move(data));
} else {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed: " << read_st;
response->mutable_status()->add_error_msgs("read cache file error");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
return;
}
}

const auto type = request->type();
const auto& path = request->path();
response->mutable_status()->set_status_code(TStatusCode::OK);

Status status = Status::OK();
if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
status = handle_peer_file_range_request(path, response);
} else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
status = handle_peer_file_cache_block_request(request, response);
}
}
DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
int st_us = dp->param<int>("sleep", 1000);
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
// sleep us
bthread_usleep(st_us);
});

int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
g_file_cache_get_by_peer_success_num << 1;
if (!status.ok()) {
LOG(WARNING) << "fetch peer data failed: " << status.to_string();
set_error_response(response, status.to_string());
}

VLOG_DEBUG << "fetch cache request=" << request->DebugString()
<< ", response=" << response->DebugString();
DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
int st_us = dp->param<int>("sleep", 1000);
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
bthread_usleep(st_us);
});

auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
g_file_cache_get_by_peer_success_num << 1;

VLOG_DEBUG << "fetch cache request=" << request->DebugString()
<< ", response=" << response->DebugString();
});

if (!ret) {
brpc::ClosureGuard closure_guard(done);
LOG(WARNING) << "fail to offer fetch peer data request to the work pool, pool="
<< _heavy_work_pool.get_info();
}
}

#include "common/compile_check_end.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true,
},
.download_done = std::move(download_done),
};
Expand Down
19 changes: 13 additions & 6 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,12 @@ std::pair<std::string, int> get_peer_connection_info(const std::string& file_pat
}

// Execute peer read with fallback to S3
// file_size is the size of the file
// used to calculate the rightmost boundary value of the block, due to inaccurate current block meta information.
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start,
size_t& size, std::unique_ptr<char[]>& buffer,
const std::string& file_path, bool is_doris_table, ReadStatistics& stats,
const IOContext* io_ctx) {
const std::string& file_path, size_t file_size, bool is_doris_table,
ReadStatistics& stats, const IOContext* io_ctx) {
auto [host, port] = get_peer_connection_info(file_path);
VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port
<< ", file_path=" << file_path;
Expand All @@ -210,7 +212,7 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t
peer_read_counter << 1;
PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size,
io_ctx);
file_size, io_ctx);
if (!st.ok()) {
LOG_WARNING("PeerFileCacheReader read from peer failed")
.tag("host", host)
Expand Down Expand Up @@ -252,19 +254,24 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockS
return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
} else {
return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
_is_doris_table, stats, io_ctx);
this->size(), _is_doris_table, stats, io_ctx);
}
});

if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
if (!doris::config::is_cloud_mode() || !_is_doris_table || io_ctx->is_warmup ||
!doris::config::enable_cache_read_from_peer) {
return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
} else {
// first try peer read, if peer failed, fallback to S3
// peer timeout is 5 seconds
// TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first
// ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size
size_t original_size = size;
auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
_is_doris_table, stats, io_ctx);
this->size(), _is_doris_table, stats, io_ctx);
if (!st.ok()) {
// Restore original size for S3 fallback, as peer read may have modified it
size = original_size;
// Fallback to S3
return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/io/cache/peer_file_cache_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() {
}

Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
Slice s, size_t* bytes_read, const IOContext* ctx) {
Slice s, size_t* bytes_read, size_t file_size,
const IOContext* ctx) {
VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
<< " bytes_read=" << *bytes_read;
*bytes_read = 0;
if (blocks.empty()) {
*bytes_read = 0;
return Status::OK();
}
if (!_is_doris_table) {
Expand All @@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& block
PFetchPeerDataRequest req;
req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
req.set_path(_path.filename().native());
req.set_file_size(static_cast<int64_t>(file_size));
for (const auto& blk : blocks) {
auto* cb = req.add_cache_req();
cb->set_block_offset(static_cast<int64_t>(blk->range().left));
Expand Down Expand Up @@ -154,13 +156,13 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& block
}
VLOG_DEBUG << "peer cache read filled=" << filled;
peer_bytes_read_total << filled;
*bytes_read = filled;
peer_bytes_per_read << filled;
if (filled != s.size) {
peer_cache_reader_failed_counter << 1;
return Status::InternalError("peer cache read incomplete: need={}, got={}", s.size, filled);
}
peer_cache_reader_succ_counter << 1;
*bytes_read = filled;
return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/io/cache/peer_file_cache_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ class PeerFileCacheReader final {
* - blocks: List of file blocks to fetch (global file offsets, inclusive ranges).
* - off: Base file offset corresponding to the start of Slice s.
* - s: Destination buffer; must be large enough to hold all requested block bytes.
* - n: Output number of bytes successfully written.
* - bytes_read: Output number of bytes read.
* - file_size: Size of the file to be read.
* - ctx: IO context (kept for interface symmetry).
*
* Returns:
* - OK: Successfully wrote exactly s.size bytes into the buffer.
* - NotSupported: The file is not a Doris table segment.
*/
Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off, Slice s,
size_t* bytes_read, const IOContext* ctx);
size_t* bytes_read, size_t file_size, const IOContext* ctx);

private:
io::Path _path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') {
// test expired be tablet cache info be removed
// after cache_read_from_peer_expired_seconds = 100s
assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "balance_tablet_be_mapping_size"))
assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read"))
assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read"))
}

docker(options) {
Expand Down
Loading