diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 294fe2a056146a..a9f8906d3d5d68 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -39,6 +39,10 @@ namespace doris { bvar::Adder g_file_cache_warm_up_cache_async_submitted_segment_num( "file_cache_warm_up_cache_async_submitted_segment_num"); +bvar::Adder g_file_cache_warm_up_cache_async_submitted_task_num( + "file_cache_warm_up_cache_async_submitted_task_num"); +bvar::Adder g_file_cache_warm_up_cache_async_submitted_tablet_num( + "file_cache_warm_up_cache_async_submitted_tablet_num"); CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env) : BaseBackendService(exec_env), _engine(engine) {} @@ -170,83 +174,89 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response, const TWarmUpCacheAsyncRequest& request) { - std::ostringstream oss; - oss << "["; - for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) { - if (i > 0) oss << ","; - oss << request.tablet_ids[i]; - } - oss << "]"; - LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port - << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str(); + // just submit the task to the thread pool, no need to wait for the result + auto do_warm_up = [this, request]() { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) { + if (i > 0) oss << ","; + oss << request.tablet_ids[i]; + } + oss << "]"; + g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size(); + LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" + << request.brpc_port << ", tablets num=" << request.tablet_ids.size() + << ", tablet_ids=" << oss.str(); - auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); - // Record each tablet in manager - for (int64_t tablet_id : request.tablet_ids) { - manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port); - } + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + // Record each tablet in manager + for (int64_t tablet_id : request.tablet_ids) { + manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port); + } - std::string host = request.host; - auto dns_cache = ExecEnv::GetInstance()->dns_cache(); - if (dns_cache == nullptr) { - LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; - } else if (!is_valid_ip(request.host)) { - Status status = dns_cache->get(request.host, &host); - if (!status.ok()) { - LOG(WARNING) << "failed to get ip from host " << request.host << ": " - << status.to_string(); - // Remove failed tablets from tracking - manager.remove_balanced_tablets(request.tablet_ids); + std::string host = request.host; + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(request.host)) { + Status status = dns_cache->get(request.host, &host); + if (!status.ok()) { + LOG(WARNING) << "failed to get ip from host " << request.host << ": " + << status.to_string(); + return; + } + } + std::string brpc_addr = get_host_port(host, request.brpc_port); + std::shared_ptr brpc_stub = + _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); + if (!brpc_stub) { + LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; return; } - } - std::string brpc_addr = get_host_port(host, request.brpc_port); - Status st = Status::OK(); - TStatus t_status; - std::shared_ptr brpc_stub = - _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); - if (!brpc_stub) { - st = Status::RpcError("Address {} is wrong", brpc_addr); - LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; - // Remove failed tablets from tracking - manager.remove_balanced_tablets(request.tablet_ids); - return; - } - brpc::Controller cntl; - PGetFileCacheMetaRequest brpc_request; - std::stringstream ss; - std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) { - brpc_request.add_tablet_ids(tablet_id); - ss << tablet_id << ","; - }); - VLOG_DEBUG << "tablets set: " << ss.str() << " stack: " << get_stack_trace(); - PGetFileCacheMetaResponse brpc_response; + PGetFileCacheMetaRequest brpc_request; + std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), + [&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); }); - brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr); - VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString() - << ", response=" << brpc_response.DebugString(); - if (!cntl.Failed()) { - g_file_cache_warm_up_cache_async_submitted_segment_num - << brpc_response.file_cache_block_metas().size(); - auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas(); - if (!file_cache_block_metas.empty()) { + auto run_rpc = [this, brpc_stub, + brpc_addr](PGetFileCacheMetaRequest request_copy) -> Status { + brpc::Controller cntl; + cntl.set_timeout_ms(20 * 1000); // 20s + PGetFileCacheMetaResponse brpc_response; + brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &request_copy, &brpc_response, + nullptr); + if (cntl.Failed()) { + LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr + << ", error=" << cntl.ErrorText() + << ", error code=" << cntl.ErrorCode(); + return Status::RpcError("{} isn't connected, error code={}", brpc_addr, + cntl.ErrorCode()); + } + VLOG_DEBUG << "warm_up_cache_async: request=" << request_copy.DebugString() + << ", response=" << brpc_response.DebugString(); + g_file_cache_warm_up_cache_async_submitted_segment_num + << brpc_response.file_cache_block_metas().size(); _engine.file_cache_block_downloader().submit_download_task( - std::move(file_cache_block_metas)); - LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets=" - << oss.str(); - } else { - LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr; - manager.remove_balanced_tablets(request.tablet_ids); + std::move(*brpc_response.mutable_file_cache_block_metas())); + return Status::OK(); + }; + + Status rpc_status = run_rpc(std::move(brpc_request)); + if (!rpc_status.ok()) { + LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr + << ", status=" << rpc_status; } - } else { - st = Status::RpcError("{} isn't connected", brpc_addr); - // Remove failed tablets from tracking - manager.remove_balanced_tablets(request.tablet_ids); - LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr - << ", error=" << cntl.ErrorText(); + }; + g_file_cache_warm_up_cache_async_submitted_task_num << 1; + Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up)); + if (!submit_st.ok()) { + LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to " + "warmup_cache_async_thread_pool, status=" + << submit_st.to_string() << ", execute synchronously"; + do_warm_up(); } - st.to_thrift(&t_status); - response.status = t_status; + TStatus t_status; + submit_st.to_thrift(&t_status); + response.status = std::move(t_status); } void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response, diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 2584ce8146b534..d242f90a2ec917 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -47,6 +47,8 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency( "file_cache_get_by_peer_server_latency"); bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( "file_cache_get_by_peer_read_cache_file_latency"); +bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency( + "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency"); CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) : PInternalService(exec_env), _engine(engine) {} @@ -95,13 +97,26 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); return; } - LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size(); + auto begin_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + std::ostringstream tablet_ids_stream; + int count = 0; + for (const auto& tablet_id : request->tablet_ids()) { + tablet_ids_stream << tablet_id << ", "; + count++; + if (count >= 10) { + break; + } + } + LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size() + << ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]"; for (const auto& tablet_id : request->tablet_ids()) { auto res = _engine.tablet_mgr().get_tablet(tablet_id); if (!res.has_value()) { LOG(ERROR) << "failed to get tablet: " << tablet_id << " err msg: " << res.error().msg(); - return; + continue; } CloudTabletSPtr tablet = std::move(res.value()); auto st = tablet->sync_rowsets(); @@ -166,7 +181,13 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( } } } - VLOG_DEBUG << "warm up get meta request=" << request->DebugString() + auto end_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts); + LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took " + << end_ts - begin_ts << " us"; + VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString() << ", response=" << response->DebugString(); } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 8529fb0ed92572..f8a7ac6851528d 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -238,10 +238,19 @@ Status CloudStorageEngine::open() { // check cluster id RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id"); - return ThreadPoolBuilder("SyncLoadForTabletsThreadPool") - .set_max_threads(config::sync_load_for_tablets_thread) - .set_min_threads(config::sync_load_for_tablets_thread) - .build(&_sync_load_for_tablets_thread_pool); + RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("SyncLoadForTabletsThreadPool") + .set_max_threads(config::sync_load_for_tablets_thread) + .set_min_threads(config::sync_load_for_tablets_thread) + .build(&_sync_load_for_tablets_thread_pool), + "fail to build SyncLoadForTabletsThreadPool"); + + RETURN_NOT_OK_STATUS_WITH_WARN(ThreadPoolBuilder("WarmupCacheAsyncThreadPool") + .set_max_threads(config::warmup_cache_async_thread) + .set_min_threads(config::warmup_cache_async_thread) + .build(&_warmup_cache_async_thread_pool), + "fail to build WarmupCacheAsyncThreadPool"); + + return Status::OK(); } void CloudStorageEngine::stop() { diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 0b61fe2076200d..6948d8c3594b53 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -152,6 +152,8 @@ class CloudStorageEngine final : public BaseStorageEngine { return *_sync_load_for_tablets_thread_pool; } + ThreadPool& warmup_cache_async_thread_pool() const { return *_warmup_cache_async_thread_pool; } + Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms); @@ -204,6 +206,7 @@ class CloudStorageEngine final : public BaseStorageEngine { std::unique_ptr _cloud_warm_up_manager; std::unique_ptr _tablet_hotspot; std::unique_ptr _sync_load_for_tablets_thread_pool; + std::unique_ptr _warmup_cache_async_thread_pool; std::unique_ptr _cloud_snapshot_mgr; // FileSystem with latest shared storage info, new data will be written to this fs. diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index c52f5b96c201ea..b87cf6ece3efce 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -70,6 +70,8 @@ DEFINE_mBool(use_public_endpoint_for_error_log, "true"); DEFINE_mInt32(sync_load_for_tablets_thread, "32"); +DEFINE_Int32(warmup_cache_async_thread, "16"); + DEFINE_mBool(enable_new_tablet_do_compaction, "true"); // Empty rowset compaction strategy configurations diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 6dd0e34d86f48b..590290fbfa7820 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -114,6 +114,8 @@ DECLARE_mBool(use_public_endpoint_for_error_log); // the theads which sync the datas which loaded in other clusters DECLARE_mInt32(sync_load_for_tablets_thread); +DECLARE_Int32(warmup_cache_async_thread); + DECLARE_mInt32(delete_bitmap_lock_expiration_seconds); DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2481d1378e4de1..24501acb175c97 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3391,9 +3391,6 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static double cloud_balance_tablet_percent_per_run = 0.05; - @ConfField(mutable = true, masterOnly = true) - public static int cloud_min_balance_tablet_num_per_run = 2; - @ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。" + "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;" + "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;" @@ -3415,6 +3412,20 @@ public static int metaServiceRpcRetryTimes() { options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"}) public static String cloud_warm_up_for_rebalance_type = "async_warmup"; + @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时," + + "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per host " + + "when batching warm-up requests during cloud tablet rebalancing, default 10"}) + public static int cloud_warm_up_batch_size = 10; + + @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡时," + + "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds before a " + + "pending warm-up batch is flushed, default 50ms"}) + public static int cloud_warm_up_batch_flush_interval_ms = 50; + + @ConfField(mutable = true, masterOnly = true, description = {"云上tablet均衡预热rpc异步线程池大小,默认4", + "Thread pool size for asynchronous warm-up RPC dispatch during cloud tablet rebalancing, default 4"}) + public static int cloud_warm_up_rpc_async_pool_size = 4; + @ConfField(mutable = true, masterOnly = false) public static String security_checker_class_name = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index c6e040fbc35674..ed3c0544183dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -34,6 +34,7 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; @@ -66,7 +67,12 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class CloudTabletRebalancer extends MasterDaemon { @@ -82,21 +88,22 @@ public class CloudTabletRebalancer extends MasterDaemon { private volatile ConcurrentHashMap> beToTabletsGlobalInSecondary = new ConcurrentHashMap>(); - private Map> futureBeToTabletsGlobal; + private volatile ConcurrentHashMap> futureBeToTabletsGlobal; private Map> clusterToBes; private Set allBes; // partitionId -> indexId -> be -> tablet - private Map>>> partitionToTablets; + private ConcurrentHashMap>>> partitionToTablets; - private Map>>> futurePartitionToTablets; + private ConcurrentHashMap>>> + futurePartitionToTablets; // tableId -> be -> tablet - private Map>> beToTabletsInTable; + private ConcurrentHashMap>> beToTabletsInTable; - private Map>> futureBeToTabletsInTable; + private ConcurrentHashMap>> futureBeToTabletsInTable; private Map beToDecommissionedTime = new HashMap(); @@ -110,10 +117,22 @@ public class CloudTabletRebalancer extends MasterDaemon { private LinkedBlockingQueue> tabletsMigrateTasks = new LinkedBlockingQueue>(); - private Map tabletToInfightTask = new HashMap<>(); + private Map tabletToInfightTask = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap warmupBatches = new ConcurrentHashMap<>(); + + private volatile ScheduledExecutorService warmupBatchScheduler; + + private volatile ScheduledExecutorService warmupCheckScheduler; + + private volatile ExecutorService warmupRpcExecutor; + + private final ConcurrentLinkedQueue failedWarmupTasks = new ConcurrentLinkedQueue<>(); private CloudSystemInfoService cloudSystemInfoService; + private final Object warmupExecutorInitLock = new Object(); + private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum(); /** @@ -167,6 +186,49 @@ public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) { this.cloudSystemInfoService = cloudSystemInfoService; } + private void initializeWarmupExecutorsIfNeeded() { + if (warmupRpcExecutor != null) { + return; // Already initialized + } + synchronized (warmupExecutorInitLock) { + if (warmupRpcExecutor != null) { + return; // Double check + } + Env env = Env.getCurrentEnv(); + if (env == null || !env.isMaster()) { + LOG.info("Env not initialized or not master, skip start warmup batch scheduler"); + return; + } + warmupRpcExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Math.max(1, Config.cloud_warm_up_rpc_async_pool_size), 1000, + "cloud-warmup-rpc-dispatch", true); + warmupBatchScheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "cloud-warmup-batch-flusher"); + t.setDaemon(true); + return t; + }); + long flushInterval = Math.max(1L, Config.cloud_warm_up_batch_flush_interval_ms); + warmupBatchScheduler.scheduleAtFixedRate(this::flushExpiredWarmupBatches, + flushInterval, flushInterval, TimeUnit.MILLISECONDS); + + warmupCheckScheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "cloud-warmup-checker"); + t.setDaemon(true); + return t; + }); + long warmupCheckInterval = 10L; + warmupCheckScheduler.scheduleAtFixedRate(() -> { + try { + // send check rpc to be, 10s check once + checkInflightWarmUpCacheAsync(); + } catch (Throwable t) { + LOG.warn("unexpected error when checking inflight warm up cache async", t); + } + }, warmupCheckInterval, warmupCheckInterval, TimeUnit.SECONDS); + LOG.info("Warmup executors initialized successfully"); + } + } + private interface Operator { void op(Database db, Table table, Partition partition, MaterializedIndex index, String cluster); } @@ -222,6 +284,88 @@ private class InfightTask { BalanceType balanceType; } + @Getter + private static class WarmupBatchKey { + private final long srcBe; + private final long destBe; + + WarmupBatchKey(long srcBe, long destBe) { + this.srcBe = srcBe; + this.destBe = destBe; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof WarmupBatchKey)) { + return false; + } + WarmupBatchKey that = (WarmupBatchKey) o; + return srcBe == that.srcBe && destBe == that.destBe; + } + + @Override + public int hashCode() { + return Objects.hash(srcBe, destBe); + } + } + + private static class WarmupTabletTask { + private final Tablet pickedTablet; + private final long srcBe; + private final long destBe; + private final String clusterId; + + WarmupTabletTask(Tablet pickedTablet, long srcBe, long destBe, String clusterId) { + this.pickedTablet = pickedTablet; + this.srcBe = srcBe; + this.destBe = destBe; + this.clusterId = clusterId; + } + } + + private static class WarmupBatch { + private final WarmupBatchKey key; + private final List tasks = new ArrayList<>(); + private long lastUpdateMs = System.currentTimeMillis(); + + WarmupBatch(WarmupBatchKey key) { + this.key = key; + } + + synchronized List addTask(WarmupTabletTask task, int batchSize) { + tasks.add(task); + lastUpdateMs = System.currentTimeMillis(); + if (tasks.size() >= batchSize) { + return drain(); + } + return Collections.emptyList(); + } + + synchronized List drainIfExpired(long flushIntervalMs) { + if (tasks.isEmpty()) { + return Collections.emptyList(); + } + if (System.currentTimeMillis() - lastUpdateMs >= flushIntervalMs) { + return drain(); + } + return Collections.emptyList(); + } + + synchronized boolean isEmpty() { + return tasks.isEmpty(); + } + + private List drain() { + List copy = new ArrayList<>(tasks); + tasks.clear(); + lastUpdateMs = System.currentTimeMillis(); + return copy; + } + } + private class TransferPairInfo { public long srcBe; public long destBe; @@ -270,7 +414,13 @@ public Set getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) { } public int getTabletNumByBackendId(long beId) { - Set tablets = beToTabletsGlobal.get(beId); + Map> sourceMap = beToTabletsGlobal; + ConcurrentHashMap> futureMap = futureBeToTabletsGlobal; + if (futureMap != null && !futureMap.isEmpty()) { + sourceMap = futureMap; + } + + Set tablets = sourceMap.get(beId); Set colocateTablets = beToColocateTabletsGlobal.get(beId); int tabletsSize = (tablets == null) ? 0 : tablets.size(); @@ -290,6 +440,9 @@ public int getTabletNumByBackendId(long beId) { // 9 check whether all tablets of decomission node have been migrated @Override protected void runAfterCatalogReady() { + // Initialize warmup executors when catalog is ready + initializeWarmupExecutorsIfNeeded(); + if (Config.enable_cloud_multi_replica) { LOG.info("Tablet balance is temporarily not supported when multi replica enabled"); return; @@ -304,7 +457,6 @@ protected void runAfterCatalogReady() { return; } - checkInflightWarmUpCacheAsync(); statRouteInfo(); migrateTabletsForSmoothUpgrade(); statRouteInfo(); @@ -738,42 +890,118 @@ private boolean completeRouteInfo() { } public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet, - Map> globalBeToTablets, - Map>> beToTabletsInTable, - Map>>> partToTablets) { + ConcurrentHashMap> globalBeToTablets, + ConcurrentHashMap>> beToTabletsInTable, + ConcurrentHashMap>>> + partToTablets) { // global - globalBeToTablets.putIfAbsent(be, new HashSet()); + globalBeToTablets.putIfAbsent(be, ConcurrentHashMap.newKeySet()); globalBeToTablets.get(be).add(tablet); // table - beToTabletsInTable.putIfAbsent(tableId, new HashMap>()); - Map> beToTabletsOfTable = beToTabletsInTable.get(tableId); - beToTabletsOfTable.putIfAbsent(be, new HashSet()); + beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap>()); + ConcurrentHashMap> beToTabletsOfTable = beToTabletsInTable.get(tableId); + beToTabletsOfTable.putIfAbsent(be, ConcurrentHashMap.newKeySet()); beToTabletsOfTable.get(be).add(tablet); // partition - partToTablets.putIfAbsent(partId, new HashMap>>()); - Map>> indexToTablets = partToTablets.get(partId); - indexToTablets.putIfAbsent(indexId, new HashMap>()); - Map> beToTabletsOfIndex = indexToTablets.get(indexId); - beToTabletsOfIndex.putIfAbsent(be, new HashSet()); + partToTablets.putIfAbsent(partId, new ConcurrentHashMap>>()); + ConcurrentHashMap>> indexToTablets = partToTablets.get(partId); + indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap>()); + ConcurrentHashMap> beToTabletsOfIndex = indexToTablets.get(indexId); + beToTabletsOfIndex.putIfAbsent(be, ConcurrentHashMap.newKeySet()); beToTabletsOfIndex.get(be).add(tablet); } + private void enqueueWarmupTask(WarmupTabletTask task) { + WarmupBatchKey key = new WarmupBatchKey(task.srcBe, task.destBe); + WarmupBatch batch = warmupBatches.computeIfAbsent(key, WarmupBatch::new); + List readyTasks = batch.addTask(task, Math.max(1, Config.cloud_warm_up_batch_size)); + if (!readyTasks.isEmpty()) { + dispatchWarmupBatch(key, readyTasks); + } + } + + private void dispatchWarmupBatch(WarmupBatchKey key, List tasks) { + if (tasks.isEmpty()) { + return; + } + initializeWarmupExecutorsIfNeeded(); + if (warmupRpcExecutor != null) { + warmupRpcExecutor.submit(() -> sendWarmupBatch(key, tasks)); + } else { + LOG.warn("warmupRpcExecutor is not initialized, skip dispatching warmup batch"); + } + } + + private void sendWarmupBatch(WarmupBatchKey key, List tasks) { + Backend srcBackend = cloudSystemInfoService.getBackend(key.getSrcBe()); + Backend destBackend = cloudSystemInfoService.getBackend(key.getDestBe()); + if (srcBackend == null || destBackend == null || !destBackend.isAlive()) { + handleWarmupBatchFailure(tasks, new IllegalStateException( + String.format("backend missing or dead, src %s dest %s", srcBackend, destBackend))); + return; + } + List tabletIds = tasks.stream().map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); + try { + sendPreHeatingRpc(tabletIds, key.getSrcBe(), key.getDestBe()); + } catch (Exception e) { + handleWarmupBatchFailure(tasks, e); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("dispatch preheat batch {} from {} to {}, tablet num {}", + tabletIds, key.getSrcBe(), key.getDestBe(), tabletIds.size()); + } + } + + private void handleWarmupBatchFailure(List tasks, Exception e) { + if (e != null) { + LOG.warn("preheat batch failed, size {}", tasks.size(), e); + } + for (WarmupTabletTask task : tasks) { + failedWarmupTasks.offer(task); + } + } + + private void revertWarmupState(WarmupTabletTask task) { + updateBeToTablets(task.pickedTablet, task.destBe, task.srcBe, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), task.clusterId)); + } + + private void processFailedWarmupTasks() { + WarmupTabletTask task; + while ((task = failedWarmupTasks.poll()) != null) { + revertWarmupState(task); + } + } + + private void flushExpiredWarmupBatches() { + long flushInterval = Math.max(1L, Config.cloud_warm_up_batch_flush_interval_ms); + for (Map.Entry entry : warmupBatches.entrySet()) { + List readyTasks = entry.getValue().drainIfExpired(flushInterval); + if (!readyTasks.isEmpty()) { + dispatchWarmupBatch(entry.getKey(), readyTasks); + } + } + } + public void statRouteInfo() { ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpFutureBeToTabletsGlobal = new ConcurrentHashMap>(); ConcurrentHashMap> tmpBeToTabletsGlobalInSecondary = new ConcurrentHashMap>(); ConcurrentHashMap> tmpBeToColocateTabletsGlobal = new ConcurrentHashMap>(); - futureBeToTabletsGlobal = new HashMap>(); - - partitionToTablets = new HashMap>>>(); - futurePartitionToTablets = new HashMap>>>(); + partitionToTablets = new ConcurrentHashMap>>>(); + futurePartitionToTablets = + new ConcurrentHashMap>>>(); - beToTabletsInTable = new HashMap>>(); - futureBeToTabletsInTable = new HashMap>>(); + beToTabletsInTable = new ConcurrentHashMap>>(); + futureBeToTabletsInTable = new ConcurrentHashMap>>(); loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId()); @@ -816,12 +1044,13 @@ public void statRouteInfo() { tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); fillBeToTablets(futureBeId, table.getId(), partition.getId(), index.getId(), tablet, - futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + tmpFutureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); } } }); beToTabletsGlobal = tmpBeToTabletsGlobal; + futureBeToTabletsGlobal = tmpFutureBeToTabletsGlobal; beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary; beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal; } @@ -858,10 +1087,11 @@ public void loopCloudReplica(Operator operator) { public void balanceInPartition(List bes, String clusterId, List infos) { // balance all partition - for (Map.Entry>>> partitionEntry : futurePartitionToTablets.entrySet()) { - Map>> indexToTablets = partitionEntry.getValue(); + for (Map.Entry>>> partitionEntry + : futurePartitionToTablets.entrySet()) { + Map>> indexToTablets = partitionEntry.getValue(); // balance all index of a partition - for (Map.Entry>> entry : indexToTablets.entrySet()) { + for (Map.Entry>> entry : indexToTablets.entrySet()) { // balance a index balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos); } @@ -870,12 +1100,16 @@ public void balanceInPartition(List bes, String clusterId, List bes, String clusterId, List infos) { // balance all tables - for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { + for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos); } } private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) throws Exception { + sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()), srcBe, destBe); + } + + private void sendPreHeatingRpc(List tabletIds, long srcBe, long destBe) throws Exception { BackendService.Client client = null; TNetworkAddress address = null; Backend srcBackend = cloudSystemInfoService.getBackend(srcBe); @@ -887,9 +1121,7 @@ private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) thr TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest(); req.setHost(srcBackend.getHost()); req.setBrpcPort(srcBackend.getBrpcPort()); - List tablets = new ArrayList(); - tablets.add(pickedTablet.getId()); - req.setTabletIds(tablets); + req.setTabletIds(new ArrayList<>(tabletIds)); TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req); if (result.getStatus().getStatusCode() != TStatusCode.OK) { LOG.warn("pre cache failed status {} {}", result.getStatus().getStatusCode(), @@ -1005,9 +1237,10 @@ private void handleWarmupCompletion(InfightTask task, String clusterId, boolean } private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, - Map> globalBeToTablets, - Map>> beToTabletsInTable, - Map>>> partToTablets) { + ConcurrentHashMap> globalBeToTablets, + ConcurrentHashMap>> beToTabletsInTable, + ConcurrentHashMap>>> partToTablets) { CloudReplica replica = (CloudReplica) pickedTablet.getReplicas().get(0); long tableId = replica.getTableId(); long partId = replica.getPartitionId(); @@ -1083,11 +1316,11 @@ private long findSourceBackend(List bes, Map> beToTablet // Check if the backend is decommissioned if (backend != null) { - if (backend.isDecommissioning() && tabletNum > 0) { + if ((backend.isDecommissioning() || backend.isDecommissioned()) && tabletNum > 0) { srcBe = be; // Mark as source if decommissioned and has tablets break; // Exit early if we found a decommissioned backend } - if (!backend.isDecommissioning() && tabletNum > maxTabletsNum) { + if (!backend.isDecommissioning() && !backend.isDecommissioned() && tabletNum > maxTabletsNum) { srcBe = be; maxTabletsNum = tabletNum; } @@ -1099,24 +1332,38 @@ private long findSourceBackend(List bes, Map> beToTablet } private long findDestinationBackend(List bes, Map> beToTablets, long srcBe) { - long destBe = -1; long minTabletsNum = Long.MAX_VALUE; + List candidateBes = new ArrayList<>(); for (Long be : bes) { long tabletNum = beToTablets.getOrDefault(be, Collections.emptySet()).size(); Backend backend = cloudSystemInfoService.getBackend(be); - if (backend != null && backend.isAlive() && !backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) { + if (backend != null && backend.isAlive() && !backend.isDecommissioning() + && !backend.isDecommissioned() && !backend.isSmoothUpgradeSrc()) { if (tabletNum < minTabletsNum) { - destBe = be; + // Found a BE with fewer tablets, reset candidates minTabletsNum = tabletNum; + candidateBes.clear(); + candidateBes.add(be); + } else if (tabletNum == minTabletsNum) { + // Found a BE with the same minimum tablet count, add to candidates + candidateBes.add(be); } } } - return destBe; + + if (candidateBes.isEmpty()) { + return -1; + } + + // Shuffle candidates with the same tablet count for better load balancing + Collections.shuffle(candidateBes, rand); + return candidateBes.get(0); } private boolean isTransferValid(long srcBe, long minTabletsNum, long maxTabletsNum, long avgNum) { - boolean srcDecommissioned = cloudSystemInfoService.getBackend(srcBe).isDecommissioning(); + boolean srcDecommissioned = cloudSystemInfoService.getBackend(srcBe).isDecommissioning() + || cloudSystemInfoService.getBackend(srcBe).isDecommissioned(); if (!srcDecommissioned) { if ((maxTabletsNum < avgNum * (1 + Config.cloud_rebalance_percent_threshold) @@ -1129,9 +1376,11 @@ private boolean isTransferValid(long srcBe, long minTabletsNum, long maxTabletsN } private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, BalanceType balanceType, - Map>>> beToTabletsInParts, - Map>> beToTabletsInTables) { - if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) { + ConcurrentHashMap>>> + beToTabletsInParts, + ConcurrentHashMap>> beToTabletsInTables) { + if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning() + || cloudSystemInfoService.getBackend(srcBe).isDecommissioned()) { return false; // If source BE is decommissioned, no conflict } @@ -1145,8 +1394,11 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B } private boolean checkGlobalBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica, - Map>>> beToTabletsInParts, - Map>> beToTabletsInTables) { + ConcurrentHashMap>>> + beToTabletsInParts, + ConcurrentHashMap>> + beToTabletsInTables) { long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts); long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts); @@ -1161,7 +1413,9 @@ private boolean checkGlobalBalanceConflict(long srcBe, long destBe, CloudReplica } private boolean checkTableBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica, - Map>>> beToTabletsInParts) { + ConcurrentHashMap>>> + beToTabletsInParts) { long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts); long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts); @@ -1169,15 +1423,29 @@ private boolean checkTableBalanceConflict(long srcBe, long destBe, CloudReplica } private long getTabletSizeInParts(long beId, CloudReplica cloudReplica, - Map>>> beToTabletsInParts) { - Set tablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) - .get(cloudReplica.getIndexId()).get(beId); + ConcurrentHashMap>>> + beToTabletsInParts) { + ConcurrentHashMap>> indexToTablets + = beToTabletsInParts.get(cloudReplica.getPartitionId()); + if (indexToTablets == null) { + return 0; + } + ConcurrentHashMap> beToTablets = indexToTablets.get(cloudReplica.getIndexId()); + if (beToTablets == null) { + return 0; + } + Set tablets = beToTablets.get(beId); return tablets == null ? 0 : tablets.size(); } private long getTabletSizeInBes(long beId, CloudReplica cloudReplica, - Map>> beToTabletsInTables) { - Set tablets = beToTabletsInTables.get(cloudReplica.getTableId()).get(beId); + ConcurrentHashMap>> beToTabletsInTables) { + ConcurrentHashMap> beToTablets = beToTabletsInTables.get(cloudReplica.getTableId()); + if (beToTablets == null) { + return 0; + } + Set tablets = beToTablets.get(beId); return tablets == null ? 0 : tablets.size(); } @@ -1188,6 +1456,8 @@ private void balanceImpl(List bes, String clusterId, Map return; } + processFailedWarmupTasks(); + long totalTabletsNum = calculateTotalTablets(bes, beToTablets); long beNum = countActiveBackends(bes); @@ -1197,7 +1467,7 @@ private void balanceImpl(List bes, String clusterId, Map } long avgNum = totalTabletsNum / beNum; - long transferNum = calculateTransferNum(avgNum); + long transferNum = calculateTransferNum(avgNum, beNum); BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId); LOG.debug("balance type {}, be num {}, total tablets num {}, avg num {}, transfer num {}", @@ -1264,14 +1534,13 @@ private long countActiveBackends(List bes) { return bes.stream() .filter(be -> { Backend backend = cloudSystemInfoService.getBackend(be); - return backend != null && !backend.isDecommissioning(); + return backend != null && !backend.isDecommissioning() && !backend.isDecommissioned(); }) .count(); } - private long calculateTransferNum(long avgNum) { - return Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), - Config.cloud_min_balance_tablet_num_per_run); + private long calculateTransferNum(long avgNum, long beNum) { + return Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), beNum); } private void updateBalanceStatus(BalanceType balanceType) { @@ -1292,12 +1561,11 @@ private Tablet pickRandomTablet(Set tablets) { private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, BalanceType balanceType, Map> beToTablets) { - try { - sendPreHeatingRpc(pickedTablet, srcBe, destBe); - } catch (Exception e) { - LOG.warn("Failed to preheat tablet {} from {} to {}, " - + "help msg change fe config cloud_warm_up_for_rebalance_type to without_warmup ", - pickedTablet.getId(), srcBe, destBe, e); + Backend srcBackend = cloudSystemInfoService.getBackend(srcBe); + Backend destBackend = cloudSystemInfoService.getBackend(destBe); + if (srcBackend == null || destBackend == null) { + LOG.warn("backend missing when preheating tablet {} from {} to {}, cluster {}", + pickedTablet.getId(), srcBe, destBe, clusterId); return; } @@ -1308,16 +1576,18 @@ private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe task.balanceType = balanceType; task.beToTablets = beToTablets; task.startTimestamp = System.currentTimeMillis() / 1000; - tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task); + InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId); - LOG.info("pre cache {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + tabletToInfightTask.put(key, task); updateBeToTablets(pickedTablet, srcBe, destBe, futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + LOG.debug("pre cache {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe, clusterId)); } private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, BalanceType balanceType, List infos) { - LOG.info("transfer {} from {} to {}, cluster {}, type {}", + LOG.debug("transfer {} from {} to {}, cluster {}, type {}", pickedTablet.getId(), srcBe, destBe, clusterId, balanceType); updateBeToTablets(pickedTablet, srcBe, destBe, beToTabletsGlobal, beToTabletsInTable, partitionToTablets); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java index 5f2a80274e5965..0a466e73f5f4d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java @@ -120,7 +120,7 @@ public void registerWaterShedTxnId(long be) throws UserException { txnIds.offer(new DbWithWaterTxn(dbid, nextTransactionId)); } txnBePairList.offer(new DbWithWaterTxnInBe(txnIds, be)); - LOG.info("register watershedtxnid {} for BE {}", txnIds.stream() + LOG.debug("register watershedtxnid {} for BE {}", txnIds.stream() .map(dbWithWaterTxn -> "(" + dbWithWaterTxn.dbId + ":" + dbWithWaterTxn.txnId + ")") .collect((Collectors.joining(", ", "{", "}"))), be); } diff --git a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy index 1c8874864c0477..4a9b77b5491772 100644 --- a/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy +++ b/regression-test/suites/cloud_p0/balance/test_expanding_node_balance.groovy @@ -90,7 +90,7 @@ suite('test_expanding_node_balance', 'docker') { } docker(clusterOptions[0]) { - def command = 'admin set frontend config("cloud_min_balance_tablet_num_per_run"="16");' + def command = 'select 1'; // assert < 300s testCase(command, 300) }