Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();

auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
// Record each tablet in manager
for (int64_t tablet_id : request.tablet_ids) {
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
}

std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
Expand All @@ -188,6 +194,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
return;
}
}
Expand All @@ -199,6 +207,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
return;
}
brpc::Controller cntl;
Expand All @@ -213,10 +223,20 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
_engine.file_cache_block_downloader().submit_download_task(
std::move(*brpc_response.mutable_file_cache_block_metas()));
auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas();
if (!file_cache_block_metas.empty()) {
_engine.file_cache_block_downloader().submit_download_task(
std::move(file_cache_block_metas));
LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets="
<< oss.str();
} else {
LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr;
manager.remove_balanced_tablets(request.tablet_ids);
}
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
<< ", error=" << cntl.ErrorText();
}
Expand Down
122 changes: 122 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include <bthread/countdown_event.h>

#include <algorithm>
#include <thread>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
Expand All @@ -27,12 +30,24 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/async_io.h"
#include "util/debug_points.h"

namespace doris {
#include "common/compile_check_avoid_begin.h"
#include "common/compile_check_begin.h"

bvar::Adder<uint64_t> g_file_cache_get_by_peer_num("file_cache_get_by_peer_num");
bvar::Adder<uint64_t> g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num");
bvar::Adder<uint64_t> g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num");
bvar::Adder<uint64_t> g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num");
bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
"file_cache_get_by_peer_server_latency");
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
"file_cache_get_by_peer_read_cache_file_latency");

CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}

Expand Down Expand Up @@ -154,6 +169,113 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}

void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller
[[maybe_unused]],
const PFetchPeerDataRequest* request,
PFetchPeerDataResponse* response,
google::protobuf::Closure* done) {
// TODO(dx): use async thread pool to handle the request, not AsyncIO
brpc::ClosureGuard closure_guard(done);
g_file_cache_get_by_peer_num << 1;
if (!config::enable_file_cache) {
LOG_WARNING("try to access file cache data, but file cache not enabled");
return;
}
int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
const auto type = request->type();
const auto& path = request->path();
response->mutable_status()->set_status_code(TStatusCode::OK);
if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
// Read specific range [file_offset, file_offset+file_size) across cached blocks
auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path);
for (auto& cb : datas) {
*(response->add_datas()) = std::move(cb);
}
} else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
// Multiple specific blocks
auto hash = io::BlockFileCache::hash(path);
auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
if (cache == nullptr) {
g_file_cache_get_by_peer_failed_num << 1;
response->mutable_status()->add_error_msgs("can't get file cache instance");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
return;
}
io::CacheContext ctx {};
// ensure a valid stats pointer is provided to cache layer
io::ReadStatistics local_stats;
ctx.stats = &local_stats;
for (const auto& cb_req : request->cache_req()) {
size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset()));
size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size()));
auto holder = cache->get_or_set(hash, offset, size, ctx);
for (auto& fb : holder.file_blocks) {
auto state = fb->state();
if (state != io::FileBlock::State::DOWNLOADED) {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed, state=" << state;
response->mutable_status()->add_error_msgs("read cache file error");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
return;
}
g_file_cache_get_by_peer_blocks_num << 1;
doris::CacheBlockPB* out = response->add_datas();
out->set_block_offset(static_cast<int64_t>(fb->offset()));
out->set_block_size(static_cast<int64_t>(fb->range().size()));
std::string data;
data.resize(fb->range().size());
// Offload the file block read to a dedicated OS thread to avoid bthread IO
Status read_st = Status::OK();
// due to file_reader.cpp:33] Check failed: bthread_self() == 0
int64_t begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto task = [&] {
// Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task.
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
Slice slice(data.data(), data.size());
read_st = fb->read(slice, /*read_offset=*/0);
};
AsyncIO::run_task(task, io::FileSystemType::LOCAL);
int64_t end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_read_cache_file_latency
<< (end_read_file_ts - begin_read_file_ts);
if (read_st.ok()) {
out->set_data(std::move(data));
} else {
g_file_cache_get_by_peer_failed_num << 1;
LOG(WARNING) << "read cache block failed: " << read_st;
response->mutable_status()->add_error_msgs("read cache file error");
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
return;
}
}
}
}
DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
int st_us = dp->param<int>("sleep", 1000);
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
// sleep us
bthread_usleep(st_us);
});

int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
g_file_cache_get_by_peer_success_num << 1;

VLOG_DEBUG << "fetch cache request=" << request->DebugString()
<< ", response=" << response->DebugString();
}

#include "common/compile_check_end.h"

bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num(
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class CloudInternalServiceImpl final : public PInternalService {
const PRecycleCacheRequest* request, PRecycleCacheResponse* response,
google::protobuf::Closure* done) override;

// Get file cached data about the path in file cache
void fetch_peer_data(google::protobuf::RpcController* controller,
const PFetchPeerDataRequest* request, PFetchPeerDataResponse* response,
google::protobuf::Closure* done) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablet->build_tablet_report_info(&tablet_info);
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) {
if (now - g_tablet_report_inactive_duration_ms < tablet->last_access_time_ms) {
// the tablet is still being accessed and used in recently, so not report it
return;
}
Expand Down
84 changes: 84 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bvar::Adder<uint64_t> g_file_cache_recycle_cache_requested_index_num(
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num");
bvar::Adder<uint64_t> g_balance_tablet_be_mapping_size("balance_tablet_be_mapping_size");

bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
"file_cache_warm_up_rowset_wait_for_compaction_latency");
Expand All @@ -104,6 +105,11 @@ CloudWarmUpManager::~CloudWarmUpManager() {
if (_download_thread.joinable()) {
_download_thread.join();
}

for (auto& shard : _balanced_tablets_shards) {
std::lock_guard<std::mutex> lock(shard.mtx);
shard.tablets.clear();
}
}

std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
Expand Down Expand Up @@ -783,5 +789,83 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id,
}
}

// Balance warm up cache management methods implementation
void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const std::string& host,
int32_t brpc_port) {
auto& shard = get_shard(tablet_id);
std::lock_guard<std::mutex> lock(shard.mtx);
JobMeta meta;
meta.be_ip = host;
meta.brpc_port = brpc_port;
shard.tablets.emplace(tablet_id, std::move(meta));
g_balance_tablet_be_mapping_size << 1;
VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" << tablet_id
<< ", host=" << host << ":" << brpc_port;
}

std::optional<std::pair<std::string, int32_t>> CloudWarmUpManager::get_balanced_tablet_info(
int64_t tablet_id) {
auto& shard = get_shard(tablet_id);
std::lock_guard<std::mutex> lock(shard.mtx);
auto it = shard.tablets.find(tablet_id);
if (it == shard.tablets.end()) {
return std::nullopt;
}
return std::make_pair(it->second.be_ip, it->second.brpc_port);
}

void CloudWarmUpManager::remove_balanced_tablet(int64_t tablet_id) {
auto& shard = get_shard(tablet_id);
std::lock_guard<std::mutex> lock(shard.mtx);
auto it = shard.tablets.find(tablet_id);
if (it != shard.tablets.end()) {
shard.tablets.erase(it);
g_balance_tablet_be_mapping_size << -1;
VLOG_DEBUG << "Removed balanced warm up cache tablet by timer, tablet_id=" << tablet_id;
}
}

void CloudWarmUpManager::remove_balanced_tablets(const std::vector<int64_t>& tablet_ids) {
// Group tablet_ids by shard to minimize lock contention
std::array<std::vector<int64_t>, SHARD_COUNT> shard_groups;
for (int64_t tablet_id : tablet_ids) {
shard_groups[get_shard_index(tablet_id)].push_back(tablet_id);
}

// Process each shard
for (size_t i = 0; i < SHARD_COUNT; ++i) {
if (shard_groups[i].empty()) continue;

auto& shard = _balanced_tablets_shards[i];
std::lock_guard<std::mutex> lock(shard.mtx);
for (int64_t tablet_id : shard_groups[i]) {
auto it = shard.tablets.find(tablet_id);
if (it != shard.tablets.end()) {
shard.tablets.erase(it);
g_balance_tablet_be_mapping_size << -1;
VLOG_DEBUG << "Removed balanced warm up cache tablet: tablet_id=" << tablet_id;
}
}
}
}

std::unordered_map<int64_t, std::pair<std::string, int32_t>>
CloudWarmUpManager::get_all_balanced_tablets() const {
std::unordered_map<int64_t, std::pair<std::string, int32_t>> result;

// Lock all shards to get consistent snapshot
std::array<std::unique_lock<std::mutex>, SHARD_COUNT> locks;
for (size_t i = 0; i < SHARD_COUNT; ++i) {
locks[i] = std::unique_lock<std::mutex>(_balanced_tablets_shards[i].mtx);
}

for (const auto& shard : _balanced_tablets_shards) {
for (const auto& [tablet_id, entry] : shard.tablets) {
result.emplace(tablet_id, std::make_pair(entry.be_ip, entry.brpc_port));
}
}
return result;
}

#include "common/compile_check_end.h"
} // namespace doris
28 changes: 28 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ struct JobMeta {
std::vector<int64_t> tablet_ids;
};

// manager for
// table warm up
// cluster warm up
// balance peer addr cache
class CloudWarmUpManager {
public:
explicit CloudWarmUpManager(CloudStorageEngine& engine);
Expand Down Expand Up @@ -85,6 +89,14 @@ class CloudWarmUpManager {

void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

// Balance warm up cache management methods
void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port);
std::optional<std::pair<std::string, int32_t>> get_balanced_tablet_info(int64_t tablet_id);
void remove_balanced_tablet(int64_t tablet_id);
void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const;
std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;

private:
void handle_jobs();

Expand Down Expand Up @@ -120,6 +132,22 @@ class CloudWarmUpManager {
std::unordered_map<int64_t, Cache> _tablet_replica_cache;
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPoolToken> _thread_pool_token;

// Sharded lock for better performance
static constexpr size_t SHARD_COUNT = 10240;
struct Shard {
mutable std::mutex mtx;
std::unordered_map<int64_t, JobMeta> tablets;
};
std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;

// Helper methods for shard operations
size_t get_shard_index(int64_t tablet_id) const {
return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
}
Shard& get_shard(int64_t tablet_id) {
return _balanced_tablets_shards[get_shard_index(tablet_id)];
}
};

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,11 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");

DEFINE_mDouble(standby_compaction_version_ratio, "0.8");

DEFINE_mBool(enable_cache_read_from_peer, "false");

// Cache the expiration time of the peer address.
// This can be configured to be less than the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration as the expiration time.
DEFINE_mInt64(cache_read_from_peer_expired_seconds, "-1");
#include "common/compile_check_end.h"
} // namespace doris::config
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,9 @@ DECLARE_mBool(enable_standby_passive_compaction);

DECLARE_mDouble(standby_compaction_version_ratio);

DECLARE_mBool(enable_cache_read_from_peer);

DECLARE_mInt64(cache_read_from_peer_expired_seconds);

#include "common/compile_check_end.h"
} // namespace doris::config
Loading
Loading