From 38860a8a39534832d5ef550d683037cb605dc691 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 30 Oct 2025 21:27:09 +0800 Subject: [PATCH] [feature](cloud) Support balance sync warm up (#56164) 1. A new type of balance is supported on the cloud. When balancing, the BE mapping of the tablet service is modified on the FE only after all the file caches of the old BE are migrated to the new BE. 2. Fix dest be not sync rs in time. src have new rs meta, but dest be not sync rs, download_file_cache_block will retrun err --- .../io/cache/block_file_cache_downloader.cpp | 14 +- .../meta-service/meta_service_resource.cpp | 27 +- .../java/org/apache/doris/common/Config.java | 22 +- .../org/apache/doris/nereids/DorisParser.g4 | 2 + .../doris/cloud/catalog/BalanceTypeEnum.java | 69 ++++ .../catalog/CloudInstanceStatusChecker.java | 53 +++ .../cloud/catalog/CloudTabletRebalancer.java | 179 ++++++++- .../doris/cloud/catalog/ComputeGroup.java | 153 ++++++++ .../cloud/system/CloudSystemInfoService.java | 44 +++ .../nereids/parser/LogicalPlanBuilder.java | 9 + .../doris/nereids/trees/plans/PlanType.java | 1 + .../commands/AlterComputeGroupCommand.java | 98 +++++ .../plans/commands/ShowClustersCommand.java | 19 +- .../trees/plans/visitor/CommandVisitor.java | 5 + .../doris/cloud/catalog/ComputeGroupTest.java | 341 ++++++++++++++++++ .../AlterComputeGroupCommandTest.java | 243 +++++++++++++ .../plans/commands/ShowComputeGroupTest.java | 91 +++++ gensrc/proto/cloud.proto | 2 + ...test_alter_compute_group_properties.groovy | 92 +++++ ...alance_use_compute_group_properties.groovy | 212 +++++++++++ .../balance/test_balance_warm_up.groovy | 2 +- ..._balance_warm_up_sync_global_config.groovy | 179 +++++++++ .../test_balance_warm_up_task_abnormal.groovy | 121 +++++++ .../test_peer_read_async_warmup.groovy | 169 +++++++++ .../test_warmup_rebalance.groovy | 2 +- 25 files changed, 2116 insertions(+), 33 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java create mode 100644 regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy create mode 100644 regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy rename regression-test/suites/cloud_p0/{multi_cluster => balance}/test_warmup_rebalance.groovy (98%) diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index f5a2a287f83ba4..46ce90aafcc2d8 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include "cloud/cloud_tablet_mgr.h" @@ -171,6 +172,7 @@ std::unordered_map snapshot_rs_metas(BaseTable void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { + std::unordered_set synced_tablets; std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() @@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block( } else { tablet = std::move(res).value(); } - + if (!synced_tablets.contains(meta.tablet_id())) { + auto st = tablet->sync_rowsets(); + if (!st) { + // just log failed, try it best + LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id() + << " err msg: " << st.to_string(); + } + synced_tablets.insert(meta.tablet_id()); + } auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get()); auto find_it = id_to_rowset_meta_map.find(meta.rowset_id()); if (find_it == id_to_rowset_meta_map.end()) { LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id() - << "rowset_id not found, rowset_id=" << meta.rowset_id(); + << " rowset_id not found, rowset_id=" << meta.rowset_id(); return; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 6c8add49cc0a9e..77a74abd521285 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo }); } -void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInfo& cluster, +void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster, std::shared_ptr resource_mgr, std::string& msg, MetaServiceCode& code) { msg = resource_mgr->update_cluster( @@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInf }); } +void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster, + std::shared_ptr resource_mgr, std::string& msg, + MetaServiceCode& code) { + msg = resource_mgr->update_cluster( + instance_id, cluster, + [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, + [&](ClusterPB& c, std::vector&) { + std::string msg; + std::stringstream ss; + if (ClusterPB::COMPUTE != c.type()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "just support set COMPUTE cluster status"; + msg = ss.str(); + return msg; + } + *c.mutable_properties() = cluster.cluster.properties(); + return msg; + }); +} + void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, const AlterClusterRequest* request, AlterClusterResponse* response, @@ -2778,7 +2798,10 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code); break; case AlterClusterRequest::ALTER_VCLUSTER_INFO: - handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg, code); + handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code); + break; + case AlterClusterRequest::ALTER_PROPERTIES: + handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code); break; default: code = MetaServiceCode::INVALID_ARGUMENT; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1b386f985888ef..8d943d5c7e731a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3333,8 +3333,26 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static int cloud_min_balance_tablet_num_per_run = 2; - @ConfField(mutable = true, masterOnly = true) - public static boolean enable_cloud_warm_up_for_rebalance = true; + @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。" + + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;" + + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;" + + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;" + + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。" + + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)" + + "设置compute group维度的balance类型,compute group维度配置优先级更高", + "Specify the scaling and warming methods for all Compute groups in a cloud mode. " + + "without_warmup: Directly modify shard mapping, first read from S3," + + "fastest re-balance but largest fluctuation; " + + "async_warmup: Asynchronous warmup, best-effort cache pulling, " + + "faster re-balance but possible cache miss; " + + "sync_warmup: Synchronous warmup, ensure cache migration completion, " + + "slower re-balance but no cache miss; " + + "peer_read_async_warmup: Directly modify shard mapping, first read from Peer BE, " + + "fastest re-balance but may affect other BEs in the same compute group performance. " + + "Note: This is a global FE configuration, you can also use SQL (ALTER COMPUTE GROUP cg PROPERTIES) " + + "to set balance type at compute group level, compute group level configuration has higher priority"}, + options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"}) + public static String cloud_warm_up_for_rebalance_type = "async_warmup"; @ConfField(mutable = true, masterOnly = false) public static String security_checker_class_name = ""; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index d85b645f0a4511..3fe88827b99646 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -264,6 +264,8 @@ supportedAlterStatement | ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault | ALTER WORKLOAD GROUP name=identifierOrText (FOR computeGroup=identifierOrText)? properties=propertyClause? #alterWorkloadGroup + | ALTER COMPUTE GROUP name=identifierOrText + properties=propertyClause? #alterComputeGroup | ALTER CATALOG name=identifier SET PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN #alterCatalogProperties | ALTER WORKLOAD POLICY name=identifierOrText diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java new file mode 100644 index 00000000000000..d66e3126d5bb59 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.common.Config; + +import lombok.Getter; + +/** + * Enum for balance type options + */ +@Getter +public enum BalanceTypeEnum { + WITHOUT_WARMUP("without_warmup"), + ASYNC_WARMUP("async_warmup"), + SYNC_WARMUP("sync_warmup"), + PEER_READ_ASYNC_WARMUP("peer_read_async_warmup"); + + private final String value; + + BalanceTypeEnum(String value) { + this.value = value; + } + + /** + * Parse string value to enum, case-insensitive + */ + public static BalanceTypeEnum fromString(String value) { + if (value == null) { + return null; + } + for (BalanceTypeEnum type : BalanceTypeEnum.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + return null; + } + + /** + * Check if the given string is a valid balance type + */ + public static boolean isValid(String value) { + return fromString(value) != null; + } + + /** + * Get the balance type enum from the configuration string + */ + public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() { + return fromString(Config.cloud_warm_up_for_rebalance_type) == null + ? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM : fromString(Config.cloud_warm_up_for_rebalance_type); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java index 044f24d2242005..40feb8b787bd26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java @@ -99,10 +99,63 @@ private void processVirtualClusters(List clusters) { List virtualClusters = new ArrayList<>(); List computeClusters = new ArrayList<>(); categorizeClusters(clusters, virtualClusters, computeClusters); + handleComputeClusters(computeClusters); handleVirtualClusters(virtualClusters, computeClusters); removeObsoleteVirtualGroups(virtualClusters); } + private void handleComputeClusters(List computeClusters) { + for (Cloud.ClusterPB computeClusterInMs : computeClusters) { + ComputeGroup computeGroupInFe = cloudSystemInfoService + .getComputeGroupById(computeClusterInMs.getClusterId()); + if (computeGroupInFe == null) { + // cluster checker will sync it + LOG.info("found compute cluster {} in ms, but not in fe mem, " + + "it may be wait cluster checker to sync, ignore it", + computeClusterInMs); + } else { + // exist compute group, check properties changed and update if needed + updatePropertiesIfChanged(computeGroupInFe, computeClusterInMs); + } + } + } + + /** + * Compare properties between compute cluster in MS and compute group in FE, + * update only the changed key-value pairs to avoid unnecessary updates. + */ + private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe, Cloud.ClusterPB computeClusterInMs) { + Map propertiesInMs = computeClusterInMs.getPropertiesMap(); + Map propertiesInFe = computeGroupInFe.getProperties(); + + if (propertiesInMs == null || propertiesInMs.isEmpty()) { + return; + } + Map changedProperties = new HashMap<>(); + + // Check for changed or new properties + for (Map.Entry entry : propertiesInMs.entrySet()) { + String key = entry.getKey(); + String valueInMs = entry.getValue(); + String valueInFe = propertiesInFe.get(key); + + if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) { + continue; + } + changedProperties.put(key, valueInMs); + + LOG.debug("Property changed for compute group {}: {} = {} (was: {})", + computeGroupInFe.getName(), key, valueInMs, valueInFe); + } + + // Only update if there are actual changes + if (!changedProperties.isEmpty()) { + LOG.info("Updating properties for compute group {}: {}", + computeGroupInFe.getName(), changedProperties); + computeGroupInFe.setProperties(changedProperties); + } + } + private void categorizeClusters(List clusters, List virtualClusters, List computeClusters) { for (Cloud.ClusterPB cluster : clusters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index d8d6b712b2ea45..a33667f0f64ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -112,6 +112,54 @@ public class CloudTabletRebalancer extends MasterDaemon { private CloudSystemInfoService cloudSystemInfoService; + private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum(); + + /** + * Get the current balance type for a compute group, falling back to global balance type if not found + */ + private BalanceTypeEnum getCurrentBalanceType(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + LOG.debug("compute group not found, use global balance type, id {}", clusterId); + return globalBalanceTypeEnum; + } + + BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType(); + if (isComputeGroupBalanceChanged(clusterId)) { + return computeGroupBalanceType; + } + return globalBalanceTypeEnum; + } + + /** + * Get the current task timeout for a compute group, falling back to global timeout if not found + */ + private int getCurrentTaskTimeout(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + return Config.cloud_pre_heating_time_limit_sec; + } + + int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout(); + if (isComputeGroupBalanceChanged(clusterId)) { + return computeGroupTimeout; + } + + return Config.cloud_pre_heating_time_limit_sec; + } + + private boolean isComputeGroupBalanceChanged(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + return false; + } + + BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType(); + int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout(); + return computeGroupBalanceType != ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM + || computeGroupTimeout != ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT; + } + public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) { super("cloud tablet rebalancer", Config.cloud_tablet_rebalancer_interval_second * 1000); this.cloudSystemInfoService = cloudSystemInfoService; @@ -239,6 +287,7 @@ protected void runAfterCatalogReady() { LOG.info("cloud tablet rebalance begin"); long start = System.currentTimeMillis(); + globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum(); buildClusterToBackendMap(); if (!completeRouteInfo()) { @@ -417,10 +466,25 @@ public void globalBalance() { public void checkInflightWarmUpCacheAsync() { Map> beToInfightTasks = new HashMap>(); + Set invalidTasks = new HashSet<>(); for (Map.Entry entry : tabletToInfightTask.entrySet()) { + String clusterId = entry.getKey().getClusterId(); + BalanceTypeEnum balanceTypeEnum = getCurrentBalanceType(clusterId); + if (balanceTypeEnum == BalanceTypeEnum.WITHOUT_WARMUP) { + // no need check warmup cache async + invalidTasks.add(entry.getKey()); + continue; + } beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } + invalidTasks.forEach(key -> { + if (LOG.isDebugEnabled()) { + LOG.debug("remove inflight warmup task tablet {} cluster {} no need warmup", + key.getTabletId(), key.getClusterId()); + } + tabletToInfightTask.remove(key); + }); List infos = new ArrayList<>(); long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; @@ -432,6 +496,7 @@ public void checkInflightWarmUpCacheAsync() { destBackend = null; } if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) { + // dest backend not exist or dead too long, need remove all inflight tasks in this dest backend List toRemove = new LinkedList<>(); for (InfightTask task : entry.getValue()) { for (InfightTablet key : tabletToInfightTask.keySet()) { @@ -447,10 +512,12 @@ public void checkInflightWarmUpCacheAsync() { continue; } if (!destBackend.isAlive()) { + // dest backend dead, dead time smaller than rehash_tablet_after_be_dead_seconds, wait next time continue; } List tablets = entry.getValue().stream() .map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); + // check dest backend whether warmup cache done Map taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey()); if (taskDone == null) { LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}", @@ -461,17 +528,7 @@ public void checkInflightWarmUpCacheAsync() { for (Map.Entry result : taskDone.entrySet()) { InfightTask task = tabletToInfightTask .getOrDefault(new InfightTablet(result.getKey(), clusterId), null); - if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp - > Config.cloud_pre_heating_time_limit_sec)) { - if (!result.getValue()) { - LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); - } - updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); - if (LOG.isDebugEnabled()) { - LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId()); - } - tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); - } + handleWarmupCompletion(task, clusterId, result.getValue(), result.getKey(), infos); } } long oldSize = infos.size(); @@ -855,6 +912,70 @@ private Map sendCheckWarmUpCacheAsyncRpc(List tabletIds, lo return null; } + private void handleWarmupCompletion(InfightTask task, String clusterId, boolean isDone, long tabletId, + List infos) { + if (task == null) { + LOG.warn("cannot find inflight task for tablet {}-{}", clusterId, tabletId); + return; + } + boolean shouldUpdateMapping = false; + BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId); + LOG.debug("cluster id {}, balance type {}, tabletId {}, ", clusterId, currentBalanceType, tabletId); + + switch (currentBalanceType) { + case ASYNC_WARMUP: { + int currentTaskTimeout = getCurrentTaskTimeout(clusterId); + boolean timeExceeded = System.currentTimeMillis() / 1000 - task.startTimestamp > currentTaskTimeout; + LOG.debug("tablet {}-{} warmup cache isDone {} timeExceeded {}", + clusterId, tabletId, isDone, timeExceeded); + if (isDone || timeExceeded) { + if (!isDone) { + // timeout but not done, not normal, info log + LOG.info("{}-{} warmup cache timeout {}, forced to change the mapping", + clusterId, tabletId, currentTaskTimeout); + } else { + // done, normal + LOG.debug("{}-{} warmup cache done, change the mapping", clusterId, tabletId); + } + shouldUpdateMapping = true; + } + break; + } + case SYNC_WARMUP: { + if (isDone) { + // done, normal + LOG.debug("{} sync cache done, change the mapping", tabletId); + shouldUpdateMapping = true; + } + break; + } + default: + break; + } + + if (!shouldUpdateMapping) { + return; + } + + updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); + + if (LOG.isDebugEnabled()) { + LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId()); + } + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); + + if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) { + try { + // send sync cache rpc again, ignore the result, the best effort to sync some new data + sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe); + } catch (Exception e) { + LOG.warn("Failed to preheat tablet {} from {} to {}, " + + "help msg turn off fe config enable_cloud_warm_up_for_rebalance", + task.pickedTablet.getId(), task.srcBe, task.destBe, e); + } + } + } + private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Map> globalBeToTablets, Map>> beToTabletsInTable, @@ -1050,6 +1171,10 @@ private void balanceImpl(List bes, String clusterId, Map long avgNum = totalTabletsNum / beNum; long transferNum = calculateTransferNum(avgNum); + BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId); + LOG.debug("balance type {}, be num {}, total tablets num {}, avg num {}, transfer num {}", + currentBalanceType, beNum, totalTabletsNum, avgNum, transferNum); + for (int i = 0; i < transferNum; i++) { TransferPairInfo pairInfo = new TransferPairInfo(); if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) { @@ -1069,17 +1194,34 @@ private void balanceImpl(List bes, String clusterId, Map CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe); - if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != null && srcBackend.isAlive()) { - if (isConflict(srcBe, destBe, cloudReplica, balanceType, - futurePartitionToTablets, futureBeToTabletsInTable)) { + if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType) + || BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) + && srcBackend != null && srcBackend.isAlive()) { + // direct switch, update fe meta directly, not send preheating task + if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { continue; } - preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, beToTablets); + transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos); + if (BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) { + LOG.debug("directly switch {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, + clusterId); + // send sync cache rpc, best effort + try { + sendPreHeatingRpc(pickedTablet, srcBe, destBe); + } catch (Exception e) { + LOG.debug("Failed to preheat tablet {} from {} to {}, " + + "directly policy, just ignore the error", + pickedTablet.getId(), srcBe, destBe, e); + return; + } + } } else { - if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { + // cache warm up + if (isConflict(srcBe, destBe, cloudReplica, balanceType, + futurePartitionToTablets, futureBeToTabletsInTable)) { continue; } - transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos); + preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, beToTablets); } } } @@ -1147,7 +1289,8 @@ private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, BalanceType balanceType, List infos) { - LOG.info("transfer {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + LOG.info("transfer {} from {} to {}, cluster {}, type {}", + pickedTablet.getId(), srcBe, destBe, clusterId, balanceType); updateBeToTablets(pickedTablet, srcBe, destBe, beToTabletsGlobal, beToTabletsInTable, partitionToTablets); updateBeToTablets(pickedTablet, srcBe, destBe, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java index 82c24afd7c5d75..c895f13f7a03e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java @@ -18,7 +18,11 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.gson.Gson; import lombok.Getter; import lombok.Setter; @@ -33,6 +37,27 @@ public class ComputeGroup { private static final Logger LOG = LogManager.getLogger(ComputeGroup.class); + public static final String BALANCE_TYPE = "balance_type"; + + public static final String BALANCE_WARM_UP_TASK_TIMEOUT = "balance_warm_up_task_timeout"; + + private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() + .add(BALANCE_TYPE).add(BALANCE_WARM_UP_TASK_TIMEOUT).build(); + + private static final Map ALL_PROPERTIES_DEFAULT_VALUE_MAP = Maps.newHashMap(); + + public static final int DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT = Config.cloud_pre_heating_time_limit_sec; + public static final BalanceTypeEnum DEFAULT_COMPUTE_GROUP_BALANCE_ENUM + = BalanceTypeEnum.fromString(Config.cloud_warm_up_for_rebalance_type); + + static { + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_TYPE, DEFAULT_COMPUTE_GROUP_BALANCE_ENUM.getValue()); + if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(Config.cloud_warm_up_for_rebalance_type)) { + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + } + } + private enum PolicyTypeEnum { ActiveStandby, } @@ -110,6 +135,10 @@ public Cloud.ClusterPolicy toPb() { @Setter private boolean needRebuildFileCache = false; + @Getter + @Setter + private Map properties = new LinkedHashMap<>(ALL_PROPERTIES_DEFAULT_VALUE_MAP); + public ComputeGroup(String id, String name, ComputeTypeEnum type) { this.id = id; this.name = name; @@ -134,6 +163,129 @@ public String getStandbyComputeGroup() { return policy.getStandbyComputeGroup(); } + private void validateTimeoutRestriction(Map inputProperties) throws DdlException { + if (!properties.containsKey(BALANCE_TYPE)) { + return; + } + String originalBalanceType = properties.get(BALANCE_TYPE); + if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(originalBalanceType)) { + return; + } + + if (inputProperties.containsKey(BALANCE_TYPE) + && BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(inputProperties.get(BALANCE_TYPE))) { + return; + } + + if (inputProperties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + + " cannot be set when current " + BALANCE_TYPE + " is " + originalBalanceType + + ". Only async_warmup type supports timeout setting."); + } + } + + /** + * Validate a single property key-value pair. + */ + private static void validateProperty(String key, String value) throws DdlException { + if (value == null || value.isEmpty()) { + return; + } + + if (!ALL_PROPERTIES_NAME.contains(key)) { + throw new DdlException("Property " + key + " is not supported"); + } + + // Validate specific properties + if (BALANCE_TYPE.equals(key)) { + if (!BalanceTypeEnum.isValid(value)) { + throw new DdlException("Property " + BALANCE_TYPE + + " only support without_warmup or async_warmup or sync_warmup"); + } + } else if (BALANCE_WARM_UP_TASK_TIMEOUT.equals(key)) { + try { + int timeout = Integer.parseInt(value); + if (timeout <= 0) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer"); + } + } catch (NumberFormatException e) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer"); + } + } + } + + public void checkProperties(Map inputProperties) throws DdlException { + if (inputProperties == null || inputProperties.isEmpty()) { + return; + } + + for (Map.Entry entry : inputProperties.entrySet()) { + validateProperty(entry.getKey(), entry.getValue()); + } + + validateTimeoutRestriction(inputProperties); + } + + public void modifyProperties(Map inputProperties) throws DdlException { + String balanceType = inputProperties.get(BALANCE_TYPE); + if (balanceType == null) { + return; + } + if (BalanceTypeEnum.WITHOUT_WARMUP.getValue().equals(balanceType) + || BalanceTypeEnum.SYNC_WARMUP.getValue().equals(balanceType) + || BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue().equals(balanceType)) { + // delete BALANCE_WARM_UP_TASK_TIMEOUT if exists + properties.remove(BALANCE_WARM_UP_TASK_TIMEOUT); + } else if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(balanceType)) { + // if BALANCE_WARM_UP_TASK_TIMEOUT exists, it has been validated in validateProperty + if (!properties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) { + properties.put(BALANCE_WARM_UP_TASK_TIMEOUT, String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + } + } + } + + // set properties, just set in periodic instance status checker + public void setProperties(Map propertiesInMs) { + if (propertiesInMs == null || propertiesInMs.isEmpty()) { + return; + } + + for (Map.Entry entry : propertiesInMs.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + try { + validateProperty(key, value); + } catch (DdlException e) { + LOG.warn("ignore invalid property. compute group: {}, key: {}, value: {}, error: {}", + name, key, value, e.getMessage()); + continue; + } + + if (value != null && !value.isEmpty()) { + properties.put(key, value); + } + } + } + + public BalanceTypeEnum getBalanceType() { + String balanceType = properties.get(BALANCE_TYPE); + BalanceTypeEnum type = BalanceTypeEnum.fromString(balanceType); + if (type == null) { + return BalanceTypeEnum.ASYNC_WARMUP; + } + return type; + } + + public int getBalanceWarmUpTaskTimeout() { + String timeoutStr = properties.get(BALANCE_WARM_UP_TASK_TIMEOUT); + try { + return Integer.parseInt(timeoutStr); + } catch (NumberFormatException e) { + return DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT; + } + } + @Override public String toString() { Map showMap = new LinkedHashMap<>(); @@ -143,6 +295,7 @@ public String toString() { showMap.put("unavailableSince", String.valueOf(unavailableSince)); showMap.put("availableSince", String.valueOf(availableSince)); showMap.put("policy", policy == null ? "no_policy" : policy.toString()); + showMap.put("properties", properties.toString()); Gson gson = new Gson(); return gson.toJson(showMap); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 62ab6e3b9ec86a..0bbfe683154b46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -1658,4 +1658,48 @@ public void renameComputeGroup(String originalName, String newGroupName) throws LOG.info("alter rename compute group, request: {}, response: {}", request, response); } } + + public void alterComputeGroupProperties(String computeGroupName, Map properties) + throws UserException { + String cloudInstanceId = ((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId(); + if (Strings.isNullOrEmpty(cloudInstanceId)) { + throw new DdlException("unable to alter compute group properties due to empty cloud_instance_id"); + } + String computeGroupId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(computeGroupName); + if (Strings.isNullOrEmpty(computeGroupId)) { + LOG.info("alter compute group properties {} not found, unable to alter", computeGroupName); + throw new DdlException("compute group '" + computeGroupName + "' not found, unable to alter properties"); + } + + ClusterPB clusterPB = ClusterPB.newBuilder() + .setClusterId(computeGroupId) + .setClusterName(computeGroupName) + .setType(ClusterPB.Type.COMPUTE) + .putAllProperties(properties) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId()) + .setOp(Cloud.AlterClusterRequest.Operation.ALTER_PROPERTIES) + .setCluster(clusterPB) + .build(); + + + Cloud.AlterClusterResponse response = null; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter compute group properties not ok, response: {}", response); + throw new UserException("failed to alter compute group properties errorCode: " + + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg() + " may be you can try later"); + } + } catch (RpcException e) { + LOG.warn("alter compute group properties rpc exception"); + throw new UserException("failed to alter compute group properties", e); + } finally { + LOG.info("alter compute group properties, request: {}, response: {}", request, response); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 3e279a96303eea..520f8f97a2bc1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -98,6 +98,7 @@ import org.apache.doris.nereids.DorisParser.AlterCatalogCommentContext; import org.apache.doris.nereids.DorisParser.AlterCatalogPropertiesContext; import org.apache.doris.nereids.DorisParser.AlterCatalogRenameContext; +import org.apache.doris.nereids.DorisParser.AlterComputeGroupContext; import org.apache.doris.nereids.DorisParser.AlterDatabasePropertiesContext; import org.apache.doris.nereids.DorisParser.AlterDatabaseRenameContext; import org.apache.doris.nereids.DorisParser.AlterDatabaseSetQuotaContext; @@ -620,6 +621,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; @@ -6323,6 +6325,13 @@ public LogicalPlan visitAlterWorkloadGroup(AlterWorkloadGroupContext ctx) { return new AlterWorkloadGroupCommand(cgName, stripQuotes(ctx.name.getText()), properties); } + @Override + public LogicalPlan visitAlterComputeGroup(AlterComputeGroupContext ctx) { + Map properties = ctx.propertyClause() != null + ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap(); + return new AlterComputeGroupCommand(ctx.name.getText(), properties); + } + @Override public LogicalPlan visitAlterWorkloadPolicy(AlterWorkloadPolicyContext ctx) { Map properties = ctx.propertyClause() != null diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 6fc59b0f584a86..f50bd047f3acca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -206,6 +206,7 @@ public enum PlanType { ALTER_STORAGE_POLICY_COMMAND, ALTER_STORAGE_VAULT, ALTER_WORKLOAD_GROUP_COMMAND, + ALTER_COMPUTE_GROUP_COMMAND, ALTER_WORKLOAD_POLICY_COMMAND, ALTER_DATABASE_RENAME_COMMAND, ALTER_DATABASE_SET_DATA_QUOTA_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java new file mode 100644 index 00000000000000..8a295678f4826e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.ComputeGroup; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +//import org.apache.commons.lang3.StringUtils; +import java.util.Map; + +/** + * alter compute group command + */ +public class AlterComputeGroupCommand extends AlterCommand { + private final String computeGroupName; + private final Map properties; + + /** + * constructor + */ + public AlterComputeGroupCommand(String computeGroupName, Map properties) { + super(PlanType.ALTER_COMPUTE_GROUP_COMMAND); + this.computeGroupName = computeGroupName; + this.properties = properties; + } + + /** + * validate + */ + public void validate(ConnectContext connectContext) throws UserException { + if (Config.isNotCloudMode()) { + throw new AnalysisException("Currently, Alter compute group is only supported in cloud mode"); + } + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + if (properties == null || properties.isEmpty()) { + throw new AnalysisException("Compute Group properties can't be empty"); + } + + CloudSystemInfoService cloudSys = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); + // check compute group exist + ComputeGroup cg = cloudSys.getComputeGroupByName(computeGroupName); + if (cg == null) { + throw new AnalysisException("Compute Group " + computeGroupName + " does not exist"); + } + + if (cg.isVirtual()) { + throw new AnalysisException("Virtual Compute Group " + computeGroupName + " can not be altered"); + } + + // check compute group's properties can be modified + cg.checkProperties(properties); + + cg.modifyProperties(properties); + } + + @Override + public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + validate(ctx); + CloudSystemInfoService cloudSys = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); + // send rpc to ms + cloudSys.alterComputeGroupProperties(computeGroupName, properties); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAlterComputeGroupCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java index da61a861ce6147..7ea77b085a856d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java @@ -59,10 +59,12 @@ public class ShowClustersCommand extends ShowCommand { // sql: show clusters; public static final ImmutableList CLUSTER_TITLE_NAMES = new ImmutableList.Builder() - .add("cluster").add("is_current").add("users").add("backend_num").add("sub_clusters").add("policy").build(); + .add("cluster").add("is_current").add("users").add("backend_num") + .add("sub_clusters").add("policy").add("properties").build(); // sql: show compute groups; public static final ImmutableList COMPUTE_GROUP_TITLE_NAMES = new ImmutableList.Builder() - .add("Name").add("IsCurrent").add("Users").add("BackendNum").add("SubComputeGroups").add("Policy").build(); + .add("Name").add("IsCurrent").add("Users").add("BackendNum") + .add("SubComputeGroups").add("Policy").add("Properties").build(); private static final Logger LOG = LogManager.getLogger(ShowClustersCommand.class); private final boolean isComputeGroup; @@ -105,13 +107,17 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc clusterNameSet.addAll(clusterNames); for (String clusterName : clusterNameSet) { - ArrayList row = Lists.newArrayList(clusterName); // current_used, users if (!Env.getCurrentEnv().getAccessManager() .checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { continue; } + ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName); + if (cg == null) { + continue; + } + ArrayList row = Lists.newArrayList(clusterName); String clusterNameFromCtx = ""; try { clusterNameFromCtx = ctx.getCloudCluster(); @@ -142,16 +148,14 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc rows.add(row); row.add(subClusterNames); row.add(policy); + row.add(cg.getProperties().toString()); continue; } // virtual compute group // virtual cg backends eq 0 row.add(String.valueOf(0)); rows.add(row); - ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName); - if (cg == null) { - continue; - } + String activeCluster = cg.getPolicy().getActiveComputeGroup(); String standbyCluster = cg.getPolicy().getStandbyComputeGroup(); // first active, second standby @@ -159,6 +163,7 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc row.add(subClusterNames); // Policy row.add(cg.getPolicy().toString()); + row.add(cg.getProperties().toString()); } return new ShowResultSet(getMetaData(), rows); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 3a8ff435ead1ff..b663110a5caf0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand; @@ -808,6 +809,10 @@ default R visitAlterWorkloadGroupCommand(AlterWorkloadGroupCommand alterWorkload return visitCommand(alterWorkloadGroupCommand, context); } + default R visitAlterComputeGroupCommand(AlterComputeGroupCommand alterComputeGroupCommand, C context) { + return visitCommand(alterComputeGroupCommand, context); + } + default R visitAlterWorkloadPolicyCommand(AlterWorkloadPolicyCommand alterWorkloadPolicyCommand, C context) { return visitCommand(alterWorkloadPolicyCommand, context); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java new file mode 100644 index 00000000000000..c61f2abefa42fc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.common.DdlException; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class ComputeGroupTest { + private ComputeGroup computeGroup; + + @BeforeEach + public void setUp() { + computeGroup = new ComputeGroup("test_id", "test_group", ComputeGroup.ComputeTypeEnum.COMPUTE); + } + + @Test + public void testCheckPropertiesWithNull() throws DdlException { + computeGroup.checkProperties(null); + computeGroup.checkProperties(Maps.newHashMap()); + } + + @Test + public void testCheckPropertiesWithValidBalanceType() throws DdlException { + // 测试有效的balance_type + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + } + + @Test + public void testCheckPropertiesWithInvalidBalanceType() { + // 测试无效的balance_type + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, "invalid_type"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testCheckPropertiesWithValidTimeout() throws DdlException { + // 测试有效的timeout + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "300"); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "1"); + computeGroup.checkProperties(properties); + } + + @Test + public void testCheckPropertiesWithInvalidTimeout() { + // 测试无效的timeout + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "-1"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "invalid"); + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testCheckPropertiesWithUnsupportedProperty() { + // 测试不支持的属性 + Map properties = Maps.newHashMap(); + properties.put("unsupported_property", "value"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testModifyPropertiesWithDirectSwitch() throws DdlException { + // 测试without_warmup类型,应该删除timeout + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout被删除 + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithSyncCache() throws DdlException { + // 测试sync_cache类型,应该删除timeout + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout被删除 + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testCheckPropertiesWithBalanceTypeTransition() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testCheckPropertiesWithWarmupCacheToWarmupCache() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testCheckPropertiesWithDirectSwitchToDirectSwitch() throws DdlException { + // 测试从direct_switch转换到direct_switch,不需要设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testModifyPropertiesWithWarmupCacheAndExistingTimeout() throws DdlException { + // 测试async_warmup类型,已存在timeout,不应该添加默认值 + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "600"); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + String originalTimeout = computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout没有被修改, 这里的意思是用户已经设置过timeout了,就不应该被覆盖 + Assertions.assertEquals(originalTimeout, computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithWarmupCacheAndNoTimeout() throws DdlException { + // 测试async_warmup类型,不存在timeout,应该添加默认值 + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + + // 确保properties中没有timeout + computeGroup.getProperties().remove(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT); + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + // 验证默认值被添加 + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertEquals(String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT), + computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithNullBalanceType() throws DdlException { + // 测试null balance_type,不应该修改properties + Map inputProperties = Maps.newHashMap(); + inputProperties.put("other_property", "value"); + + Map originalProperties = Maps.newHashMap(computeGroup.getProperties()); + + computeGroup.modifyProperties(inputProperties); + + // 验证properties没有被修改 + Assertions.assertEquals(originalProperties, computeGroup.getProperties()); + } + + @Test + public void testModifyPropertiesWithEmptyInput() throws DdlException { + // 测试空输入,不应该修改properties + Map inputProperties = Maps.newHashMap(); + + Map originalProperties = Maps.newHashMap(computeGroup.getProperties()); + + computeGroup.modifyProperties(inputProperties); + + // 验证properties没有被修改 + Assertions.assertEquals(originalProperties, computeGroup.getProperties()); + } + + @Test + public void testCheckPropertiesWithSyncCacheToWarmupCache() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithNoCurrentBalanceType() throws DdlException { + // 测试当前没有设置balance_type的情况 + computeGroup.getProperties().remove(ComputeGroup.BALANCE_TYPE); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithCurrentWarmupCache() throws DdlException { + // 测试当前balance_type是warmup_cache的情况 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToWarmupCache() throws DdlException { + // 测试从direct_switch转换到warmup_cache并设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithSyncCacheToWarmupCacheWithTimeout() throws DdlException { + // 测试从sync_cache转换到warmup_cache并设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchAndOnlyTimeout() { + // 测试当前是direct_switch,仅设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithSyncCacheAndOnlyTimeout() { + // 测试当前是sync_cache,仅设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToSyncCacheAndTimeout() { + // 测试从direct_switch转换到sync_cache并设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToSameAndTimeout() { + // 测试从direct_switch转换到direct_switch并设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithNoInputTimeout() throws DdlException { + // 测试输入中没有timeout的情况 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put("other_property", "value"); + Assertions.assertThrows(DdlException.class, () -> { + // Property other_property is not supported + computeGroup.checkProperties(inputProperties); + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java new file mode 100644 index 00000000000000..8ff8b60ffe05b6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.backup.CatalogMocker; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.ComputeGroup; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class AlterComputeGroupCommandTest { + private static final String internalCtl = InternalCatalog.INTERNAL_CATALOG_NAME; + @Mocked + private Env env; + @Mocked + private AccessControllerManager accessControllerManager; + @Mocked + private InternalCatalog catalog; + @Mocked + private ConnectContext connectContext; + @Mocked + private CloudSystemInfoService cloudSystemInfoService; + @Mocked + private ComputeGroup computeGroup; + private Database db; + + private void runBefore() throws Exception { + db = CatalogMocker.mockDb(); + new Expectations() { + { + Env.getCurrentEnv(); + minTimes = 0; + result = env; + + env.getAccessManager(); + minTimes = 0; + result = accessControllerManager; + + ConnectContext.get(); + minTimes = 0; + result = connectContext; + + connectContext.isSkipAuth(); + minTimes = 0; + result = true; + + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = true; + } + }; + } + + @Test + public void testValidateNonCloudMode() throws Exception { + runBefore(); + Config.deploy_mode = "non-cloud"; + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateAuthFailed() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + new Expectations() { + { + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = false; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateEmptyProperties() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", null); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + + AlterComputeGroupCommand command2 = new AlterComputeGroupCommand("test_group", new HashMap<>()); + Assertions.assertThrows(UserException.class, () -> command2.validate(connectContext)); + } + + @Test + public void testValidateComputeGroupNotExist() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("non_exist_group"); + minTimes = 0; + result = null; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("non_exist_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateVirtualComputeGroup() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("virtual_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = true; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("virtual_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateInvalidProperties() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("test_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = false; + + computeGroup.checkProperties((Map) any); + minTimes = 0; + result = new DdlException("Invalid property"); + } + }; + + Map properties = Maps.newHashMap(); + properties.put("invalid_property", "invalid_value"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateSuccess() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("test_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = false; + } + }; + + new Expectations() { + { + computeGroup.checkProperties((Map) any); + minTimes = 0; + + computeGroup.modifyProperties((Map) any); + minTimes = 0; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + properties.put("balance_warm_up_task_timeout", "300"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java new file mode 100644 index 00000000000000..06588301872693 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class ShowComputeGroupTest extends TestWithFeService { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + } + + @Test + public void testShowComputeGroupsInCloudMode() throws Exception { + Config.deploy_mode = "cloud"; + ShowClustersCommand command = new ShowClustersCommand(true); + ShowResultSetMetaData metaData = command.getMetaData(); + Assertions.assertNotNull(metaData); + List columnNames = metaData.getColumns().stream() + .map(Column::getName) + .collect(Collectors.toList()); + Assertions.assertEquals(7, columnNames.size()); + Assertions.assertEquals("Name", columnNames.get(0)); + Assertions.assertEquals("IsCurrent", columnNames.get(1)); + Assertions.assertEquals("Users", columnNames.get(2)); + Assertions.assertEquals("BackendNum", columnNames.get(3)); + Assertions.assertEquals("SubComputeGroups", columnNames.get(4)); + Assertions.assertEquals("Policy", columnNames.get(5)); + Assertions.assertEquals("Properties", columnNames.get(6)); + } + + @Test + public void testShowComputeGroupsInNonCloudMode() throws Exception { + Config.deploy_mode = "not-cloud"; + ShowClustersCommand command = new ShowClustersCommand(true); + Assertions.assertThrows(AnalysisException.class, () -> { + command.doRun(connectContext, null); + }); + } + + @Test + public void testShowClustersInCloudMode() throws Exception { + ShowClustersCommand command = new ShowClustersCommand(false); + ShowResultSetMetaData metaData = command.getMetaData(); + Assertions.assertNotNull(metaData); + List columnNames = metaData.getColumns().stream() + .map(Column::getName).collect(Collectors.toList()); + Assertions.assertEquals(7, columnNames.size()); + Assertions.assertEquals("cluster", columnNames.get(0)); + Assertions.assertEquals("is_current", columnNames.get(1)); + Assertions.assertEquals("users", columnNames.get(2)); + Assertions.assertEquals("backend_num", columnNames.get(3)); + Assertions.assertEquals("sub_clusters", columnNames.get(4)); + Assertions.assertEquals("policy", columnNames.get(5)); + Assertions.assertEquals("properties", columnNames.get(6)); + } + + @Test + public void testShowClustersInNonCloudMode() throws Exception { + Config.deploy_mode = "not-cloud"; + ShowClustersCommand command = new ShowClustersCommand(false); + Assertions.assertThrows(AnalysisException.class, () -> { + command.doRun(connectContext, null); + }); + } +} diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 1a73cb905c2601..c380e07d4e88b3 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -182,6 +182,7 @@ message ClusterPB { optional int64 mtime = 11; repeated string cluster_names = 12; // clusters in virtual cluster optional ClusterPolicy cluster_policy = 13; // virtual cluster policy + map properties = 14; // clsuter additional properties } message NodeInfoPB { @@ -1410,6 +1411,7 @@ message AlterClusterRequest { UPDATE_CLUSTER_ENDPOINT = 9; SET_CLUSTER_STATUS = 10; ALTER_VCLUSTER_INFO = 11; + ALTER_PROPERTIES = 12; } optional string instance_id = 1; optional string cloud_unique_id = 2; // For auth diff --git a/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy new file mode 100644 index 00000000000000..e0db0c258e747a --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite('test_alter_compute_group_properties', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def findComputeGroup = { clusterName -> + def showComputeGroups = sql_return_maparray """SHOW COMPUTE GROUPS""" + log.info("SHOW COMPUTE GROUPS result: {}", showComputeGroups) + showComputeGroups.find { it.Name == clusterName } + } + + def findCluster = { clusterName -> + def showCg = sql_return_maparray """SHOW CLUSTERS""" + log.info("SHOW CLUSTERS result: {}", showCg) + showCg.find { it.cluster == clusterName } + } + + docker(options) { + String clusterName = "compute_cluster" + def showComputeGroup = findComputeGroup(clusterName) + def showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='without_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup')) + assertTrue(showclusters.properties.contains('balance_type=without_warmup')) + try { + // errCode = 2, detailMessage = Property balance_warm_up_task_timeout cannot be set when current balance_type is without_warmup. Only async_warmup type supports timeout setting. + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_warm_up_task_timeout'='6000')""" + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("Property balance_warm_up_task_timeout cannot be set when current")) + } + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='6000')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=6000')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=6000')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='without_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup')) + assertTrue(showclusters.properties.contains('balance_type=without_warmup')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='async_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='sync_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=sync_warmup')) + assertTrue(showclusters.properties.contains('balance_type=sync_warmup')) + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy new file mode 100644 index 00000000000000..f96119ec29b4b5 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_use_compute_group_properties', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=30' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def global_config_cluster = "compute_cluster" + def without_warmup_cluster = "without_warmup" + def async_warmup_cluster = "async_warmup" + def sync_warmup_cluster = "sync_warmup" + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + + // alter each cluster different balance type + sql """ALTER COMPUTE GROUP $without_warmup_cluster PROPERTIES ('balance_type'='without_warmup')""" + sql """ALTER COMPUTE GROUP $async_warmup_cluster PROPERTIES ('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='10')""" + sql """ALTER COMPUTE GROUP $sync_warmup_cluster PROPERTIES ('balance_type'='sync_warmup')""" + + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + def beforeBalanceEveryClusterCache = [:] + + def clusterNameToBeIdx = [:] + clusterNameToBeIdx[global_config_cluster] = [1] + clusterNameToBeIdx[without_warmup_cluster] = [2] + clusterNameToBeIdx[async_warmup_cluster] = [3] + clusterNameToBeIdx[sync_warmup_cluster] = [4] + + // generate primary tablet in each cluster + for (clusterName in [global_config_cluster, without_warmup_cluster, async_warmup_cluster, sync_warmup_cluster]) { + sql """ use @$clusterName """ + sql """ select * from $table """ + def beIdxs = clusterNameToBeIdx[clusterName] + def bes = [] + beIdxs.each { beIdx -> + def be = cluster.getBeByIndex(beIdx) + bes << be + } + logger.info("clusterName {} be idxs {}, bes {}", clusterName, beIdxs, bes) + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(bes) + logger.info("before add be fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + def merged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + beforeBalanceEveryClusterCache[clusterName] = [beforeGetFromFe, beforeGetFromBe, merged23CacheDir] + } + logger.info("before balance every cluster cache {}", beforeBalanceEveryClusterCache) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, global_config_cluster) + cluster.addBackend(1, without_warmup_cluster) + cluster.addBackend(1, async_warmup_cluster) + cluster.addBackend(1, sync_warmup_cluster) + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + clusterNameToBeIdx[global_config_cluster] = [1, 5] + clusterNameToBeIdx[without_warmup_cluster] = [2, 6] + clusterNameToBeIdx[async_warmup_cluster] = [3, 7] + clusterNameToBeIdx[sync_warmup_cluster] = [4, 8] + + // sleep 11s, wait balance + // and sync_warmup cluster task 10s timeout + sleep(11 * 1000) + + def afterBalanceEveryClusterCache = [:] + + for (clusterName in [global_config_cluster, without_warmup_cluster, async_warmup_cluster, sync_warmup_cluster]) { + sql """ use @$clusterName """ + sql """ select * from $table """ + def beIdxs = clusterNameToBeIdx[clusterName] + def bes = [] + beIdxs.each { beIdx -> + def be = cluster.getBeByIndex(beIdx) + bes << be + } + logger.info("after add be clusterName {} be idxs {}, bes {}", clusterName, beIdxs, bes) + + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(bes) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", afterCacheDirVersion3) + def merged23CacheDir = [afterCacheDirVersion2, afterCacheDirVersion3] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + afterBalanceEveryClusterCache[clusterName] = [afterGetFromFe, afterGetFromBe, merged23CacheDir] + logger.info("after add be clusterName {} fe tablets {}, be tablets {}, cache dir {}", clusterName, afterGetFromFe, afterGetFromBe, merged23CacheDir) + } + logger.info("after add be balance every cluster cache {}", afterBalanceEveryClusterCache) + + // assert first map keys + def assertFirstMapKeys = { clusterRet, expectedEqual -> + def firstMap = clusterRet[0] + def keys = firstMap.keySet().toList() + if (expectedEqual) { + assert firstMap[keys[0]] == firstMap[keys[1]] + } else { + assert firstMap[keys[0]] != firstMap[keys[1]] + } + } + + // check afterBalanceEveryClusterCache + // fe config cloud_warm_up_for_rebalance_type=sync_warmup + def global_config_cluster_ret = afterBalanceEveryClusterCache[global_config_cluster] + logger.info("global_config_cluster_ret {}", global_config_cluster_ret) + // fe tablets not changed + assertFirstMapKeys(global_config_cluster_ret, true) + + def without_warmup_cluster_ret = afterBalanceEveryClusterCache[without_warmup_cluster] + logger.info("without_warmup_cluster_ret {}", without_warmup_cluster_ret) + // fe tablets has changed + assertFirstMapKeys(without_warmup_cluster_ret, false) + + def async_warmup_cluster_ret = afterBalanceEveryClusterCache[async_warmup_cluster] + logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret) + // fe tablets has changed, due to task timeout + assertFirstMapKeys(async_warmup_cluster_ret, false) + + def sync_warmup_cluster_ret = afterBalanceEveryClusterCache[sync_warmup_cluster] + logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret) + // fe tablets not changed + assertFirstMapKeys(sync_warmup_cluster_ret, true) + + logger.info("success check after balance every cluster cache, cluster's balance type is worked") + } + + docker(options) { + cluster.addBackend(1, without_warmup_cluster) + cluster.addBackend(1, async_warmup_cluster) + cluster.addBackend(1, sync_warmup_cluster) + testCase("test_balance_warm_up_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy index 7a18f22bb31cf5..50476800d75dea 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy @@ -29,7 +29,7 @@ suite('test_balance_warm_up', 'docker') { 'sys_log_verbose_modules=org', 'heartbeat_interval_second=1', 'rehash_tablet_after_be_dead_seconds=3600', - 'enable_cloud_warm_up_for_rebalance=true' + 'cloud_warm_up_for_rebalance_type=async_warmup' ] options.beConfigs += [ 'report_tablet_interval_seconds=1', diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy new file mode 100644 index 00000000000000..19c252fba8598c --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_sync_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=30' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + sleep(5 * 1000) + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, new rs after warm up task + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + def afterMerged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 4 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // check tablet still in old be + def beforeWarmUpTaskOkDis = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up dis result {}", beforeWarmUpTaskOkDis) + assert beforeWarmUpTaskOkDis.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + GetDebugPoint().disableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskOkResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskOkResult) + afterWarmUpTaskOkResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("after cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("after cache dir version 3 {}", afterCacheDirVersion3) + sleep(5 * 1000) + // version 4 + def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("after cache dir version 4 {}", afterCacheDirVersion4) + + def afterMergedCacheDir = [afterCacheDirVersion2, afterCacheDirVersion3, afterCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + } + + docker(options) { + testCase("test_balance_warm_up_sync_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy new file mode 100644 index 00000000000000..0938126d61cf70 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_task_abnormal', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=10' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + assert beforeWarmUpResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + // sync warm up task always return false + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + // wait for some time to make sure warm up task is processed, but mapping is not changed + sleep(15 * 1000) + // check mapping is not changed + def afterAddBeResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after add be result {}", afterAddBeResult) + // two be, but 2 replica num is still mapping in old be + assert afterAddBeResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + // test recover from abnormal + sql """ALTER COMPUTE GROUP compute_cluster PROPERTIES ('balance_type'='without_warmup')""" + sleep(5 * 1000) + + def afterAlterResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after alter balance policy result {}", afterAlterResult) + // now mapping is changed to 1 replica in each be + assert afterAlterResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + docker(options) { + testCase("test_balance_warm_up_task_abnormal_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy new file mode 100644 index 00000000000000..8ce7160654264c --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_peer_read_async_warmup', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=peer_read_async_warmup', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader::download_segment_file_sleep", [sleep_time: 50]) + setFeConfig('enable_cloud_multi_replica', false) + awaitUntil(500) { + def afterRebalanceResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after rebalance result {}", afterRebalanceResult) + afterRebalanceResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, in new be, but not in old be + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + + sql """ + insert into $table values (70, '7'), (80, '8') + """ + // version 5, in new be, but not in old be + def beforeCacheDirVersion5 = getTabletFileCacheDirFromBe(msHttpPort, table, 5) + logger.info("cache dir version 5 {}", beforeCacheDirVersion5) + + def afterMerged2345CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4, beforeCacheDirVersion5] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 2,3,4,5 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged2345CacheDir) + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + + def newAddBeCacheDir = afterMerged2345CacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + // version 4, 5 + assertTrue(newAddBeCacheDir.size() == 2, "new add be should have version 4,5 cache file") + // warm up task blocked by debug point, so old be should not have version 4,5 cache file + assertFalse(afterMerged2345CacheDir[oldBe.Host].containsAll(newAddBeCacheDir), "old be should not have version 4,5 cache file") + + // The query triggers reading the file cache from the peer + profile("test_peer_read_async_warmup_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_peer_read_async_warmup_profile */ select * from $table""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("NumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + + // peer read cache, so it should read version 2,3 cache file from old be, not s3 + assertTrue(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read"), "new add be should have peer read cache") + assertTrue(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read"), "new add be should not have s3 read cache") + } + + docker(options) { + testCase("test_peer_read_async_warmup_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy similarity index 98% rename from regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy rename to regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy index aa35a70e121957..c0dc2f9747c60c 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy +++ b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy @@ -25,7 +25,7 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster, docker') { def options = new ClusterOptions() options.feConfigs += [ 'cloud_cluster_check_interval_second=1', - 'enable_cloud_warm_up_for_rebalance=true', + 'cloud_warm_up_for_rebalance_type=async_warmup', 'cloud_tablet_rebalancer_interval_second=1', 'cloud_balance_tablet_percent_per_run=0.5', 'sys_log_verbose_modules=org',