Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 79 additions & 65 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ namespace doris {

bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
"file_cache_warm_up_cache_async_submitted_segment_num");
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
"file_cache_warm_up_cache_async_submitted_task_num");
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
"file_cache_warm_up_cache_async_submitted_tablet_num");

CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
: BaseBackendService(exec_env), _engine(engine) {}
Expand Down Expand Up @@ -169,79 +173,89 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,

void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
if (i > 0) oss << ",";
oss << request.tablet_ids[i];
}
oss << "]";
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
<< ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();
// just submit the task to the thread pool, no need to wait for the result
auto do_warm_up = [this, request]() {
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
if (i > 0) oss << ",";
oss << request.tablet_ids[i];
}
oss << "]";
g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size();
LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":"
<< request.brpc_port << ", tablets num=" << request.tablet_ids.size()
<< ", tablet_ids=" << oss.str();

auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
// Record each tablet in manager
for (int64_t tablet_id : request.tablet_ids) {
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
// Record each tablet in manager
for (int64_t tablet_id : request.tablet_ids) {
manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
}

std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(request.host)) {
Status status = dns_cache->get(request.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(request.host)) {
Status status = dns_cache->get(request.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
return;
}
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
std::shared_ptr<PBackendService_Stub> brpc_stub =
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
if (!brpc_stub) {
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
return;
}
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
Status st = Status::OK();
TStatus t_status;
std::shared_ptr<PBackendService_Stub> brpc_stub =
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
return;
}
brpc::Controller cntl;
PGetFileCacheMetaRequest brpc_request;
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
PGetFileCacheMetaResponse brpc_response;
PGetFileCacheMetaRequest brpc_request;
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });

brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
<< ", response=" << brpc_response.DebugString();
if (!cntl.Failed()) {
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas();
if (!file_cache_block_metas.empty()) {
auto run_rpc = [this, brpc_stub,
brpc_addr](PGetFileCacheMetaRequest request_copy) -> Status {
brpc::Controller cntl;
cntl.set_timeout_ms(20 * 1000); // 20s
PGetFileCacheMetaResponse brpc_response;
brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, &brpc_response,
nullptr);
if (cntl.Failed()) {
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
<< ", error=" << cntl.ErrorText()
<< ", error code=" << cntl.ErrorCode();
return Status::RpcError("{} isn't connected, error code={}", brpc_addr,
cntl.ErrorCode());
}
VLOG_DEBUG << "warm_up_cache_async: request=" << request_copy.DebugString()
<< ", response=" << brpc_response.DebugString();
g_file_cache_warm_up_cache_async_submitted_segment_num
<< brpc_response.file_cache_block_metas().size();
_engine.file_cache_block_downloader().submit_download_task(
std::move(file_cache_block_metas));
LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets="
<< oss.str();
} else {
LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr;
manager.remove_balanced_tablets(request.tablet_ids);
std::move(*brpc_response.mutable_file_cache_block_metas()));
return Status::OK();
};

Status rpc_status = run_rpc(std::move(brpc_request));
if (!rpc_status.ok()) {
LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr
<< ", status=" << rpc_status;
}
} else {
st = Status::RpcError("{} isn't connected", brpc_addr);
// Remove failed tablets from tracking
manager.remove_balanced_tablets(request.tablet_ids);
LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
<< ", error=" << cntl.ErrorText();
};
g_file_cache_warm_up_cache_async_submitted_task_num << 1;
Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
if (!submit_st.ok()) {
LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
"warmup_cache_async_thread_pool, status="
<< submit_st.to_string() << ", execute synchronously";
do_warm_up();
}
st.to_thrift(&t_status);
response.status = t_status;
TStatus t_status;
submit_st.to_thrift(&t_status);
response.status = std::move(t_status);
}

void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
Expand Down
27 changes: 24 additions & 3 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
"file_cache_get_by_peer_server_latency");
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
"file_cache_get_by_peer_read_cache_file_latency");
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
"cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");

CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}
Expand Down Expand Up @@ -95,13 +97,26 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
return;
}
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size();
auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
std::ostringstream tablet_ids_stream;
int count = 0;
for (const auto& tablet_id : request->tablet_ids()) {
tablet_ids_stream << tablet_id << ", ";
count++;
if (count >= 10) {
break;
}
}
LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size()
<< ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _engine.tablet_mgr().get_tablet(tablet_id);
if (!res.has_value()) {
LOG(ERROR) << "failed to get tablet: " << tablet_id
<< " err msg: " << res.error().msg();
return;
continue;
}
CloudTabletSPtr tablet = std::move(res.value());
auto st = tablet->sync_rowsets();
Expand Down Expand Up @@ -166,7 +181,13 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
}
}
}
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts);
LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took "
<< end_ts - begin_ts << " us";
VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString()
<< ", response=" << response->DebugString();
}

Expand Down
17 changes: 13 additions & 4 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,19 @@ Status CloudStorageEngine::open() {
// check cluster id
RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
.build(&_sync_load_for_tablets_thread_pool);
RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
.build(&_sync_load_for_tablets_thread_pool),
"fail to build SyncLoadForTabletsThreadPool");

RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool")
.set_max_threads(config::warmup_cache_async_thread)
.set_min_threads(config::warmup_cache_async_thread)
.build(&_warmup_cache_async_thread_pool),
"fail to build WarmupCacheAsyncThreadPool");

return Status::OK();
}

void CloudStorageEngine::stop() {
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
return *_sync_load_for_tablets_thread_pool;
}

ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; }

Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator);

Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);
Expand Down Expand Up @@ -204,6 +206,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::unique_ptr<CloudWarmUpManager> _cloud_warm_up_manager;
std::unique_ptr<TabletHotspot> _tablet_hotspot;
std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;

// FileSystem with latest shared storage info, new data will be written to this fs.
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true");

DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_Int32(warmup_cache_async_thread, "16");

DEFINE_mBool(enable_new_tablet_do_compaction, "true");

// Empty rowset compaction strategy configurations
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log);
// the theads which sync the datas which loaded in other clusters
DECLARE_mInt32(sync_load_for_tablets_thread);

DECLARE_Int32(warmup_cache_async_thread);

DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);

DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
Expand Down
31 changes: 21 additions & 10 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3372,16 +3372,13 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, masterOnly = true)
public static double cloud_balance_tablet_percent_per_run = 0.05;

@ConfField(mutable = true, masterOnly = true)
public static int cloud_min_balance_tablet_num_per_run = 2;

@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+ "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+ "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ "设置compute group维度的balance类型,compute group维度配置优先级更高",
@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。"
+ "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
+ "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
+ "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
+ "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE 拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
+ "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
"Specify the scaling and warming methods for all Compute groups in a cloud mode. "
+ "without_warmup: Directly modify shard mapping, first read from S3,"
+ "fastest re-balance but largest fluctuation; "
Expand All @@ -3396,6 +3393,20 @@ public static int metaServiceRpcRetryTimes() {
options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"})
public static String cloud_warm_up_for_rebalance_type = "async_warmup";

@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+ "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per host "
+ "when batching warm-up requests during cloud tablet rebalancing, default 10"})
public static int cloud_warm_up_batch_size = 10;

@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时,"
+ "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds before a "
+ "pending warm-up batch is flushed, default 50ms"})
public static int cloud_warm_up_batch_flush_interval_ms = 50;

@ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡预热rpc异步线程池大小,默认4",
"Thread pool size for asynchronous warm-up RPC dispatch during cloud tablet rebalancing, default 4"})
public static int cloud_warm_up_rpc_async_pool_size = 4;

@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";

Expand Down
Loading
Loading