diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index f5a2a287f83ba4..46ce90aafcc2d8 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include "cloud/cloud_tablet_mgr.h" @@ -171,6 +172,7 @@ std::unordered_map snapshot_rs_metas(BaseTable void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { + std::unordered_set synced_tablets; std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id() << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id() @@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block( } else { tablet = std::move(res).value(); } - + if (!synced_tablets.contains(meta.tablet_id())) { + auto st = tablet->sync_rowsets(); + if (!st) { + // just log failed, try it best + LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id() + << " err msg: " << st.to_string(); + } + synced_tablets.insert(meta.tablet_id()); + } auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get()); auto find_it = id_to_rowset_meta_map.find(meta.rowset_id()); if (find_it == id_to_rowset_meta_map.end()) { LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id() - << "rowset_id not found, rowset_id=" << meta.rowset_id(); + << " rowset_id not found, rowset_id=" << meta.rowset_id(); return; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 6c8add49cc0a9e..77a74abd521285 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo }); } -void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInfo& cluster, +void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster, std::shared_ptr resource_mgr, std::string& msg, MetaServiceCode& code) { msg = resource_mgr->update_cluster( @@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInf }); } +void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster, + std::shared_ptr resource_mgr, std::string& msg, + MetaServiceCode& code) { + msg = resource_mgr->update_cluster( + instance_id, cluster, + [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); }, + [&](ClusterPB& c, std::vector&) { + std::string msg; + std::stringstream ss; + if (ClusterPB::COMPUTE != c.type()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "just support set COMPUTE cluster status"; + msg = ss.str(); + return msg; + } + *c.mutable_properties() = cluster.cluster.properties(); + return msg; + }); +} + void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, const AlterClusterRequest* request, AlterClusterResponse* response, @@ -2778,7 +2798,10 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code); break; case AlterClusterRequest::ALTER_VCLUSTER_INFO: - handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg, code); + handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code); + break; + case AlterClusterRequest::ALTER_PROPERTIES: + handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code); break; default: code = MetaServiceCode::INVALID_ARGUMENT; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1b386f985888ef..8d943d5c7e731a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3333,8 +3333,26 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, masterOnly = true) public static int cloud_min_balance_tablet_num_per_run = 2; - @ConfField(mutable = true, masterOnly = true) - public static boolean enable_cloud_warm_up_for_rebalance = true; + @ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。" + + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;" + + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;" + + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;" + + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。" + + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)" + + "设置compute group维度的balance类型,compute group维度配置优先级更高", + "Specify the scaling and warming methods for all Compute groups in a cloud mode. " + + "without_warmup: Directly modify shard mapping, first read from S3," + + "fastest re-balance but largest fluctuation; " + + "async_warmup: Asynchronous warmup, best-effort cache pulling, " + + "faster re-balance but possible cache miss; " + + "sync_warmup: Synchronous warmup, ensure cache migration completion, " + + "slower re-balance but no cache miss; " + + "peer_read_async_warmup: Directly modify shard mapping, first read from Peer BE, " + + "fastest re-balance but may affect other BEs in the same compute group performance. " + + "Note: This is a global FE configuration, you can also use SQL (ALTER COMPUTE GROUP cg PROPERTIES) " + + "to set balance type at compute group level, compute group level configuration has higher priority"}, + options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"}) + public static String cloud_warm_up_for_rebalance_type = "async_warmup"; @ConfField(mutable = true, masterOnly = false) public static String security_checker_class_name = ""; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index d85b645f0a4511..3fe88827b99646 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -264,6 +264,8 @@ supportedAlterStatement | ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault | ALTER WORKLOAD GROUP name=identifierOrText (FOR computeGroup=identifierOrText)? properties=propertyClause? #alterWorkloadGroup + | ALTER COMPUTE GROUP name=identifierOrText + properties=propertyClause? #alterComputeGroup | ALTER CATALOG name=identifier SET PROPERTIES LEFT_PAREN propertyItemList RIGHT_PAREN #alterCatalogProperties | ALTER WORKLOAD POLICY name=identifierOrText diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java new file mode 100644 index 00000000000000..d66e3126d5bb59 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/BalanceTypeEnum.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.common.Config; + +import lombok.Getter; + +/** + * Enum for balance type options + */ +@Getter +public enum BalanceTypeEnum { + WITHOUT_WARMUP("without_warmup"), + ASYNC_WARMUP("async_warmup"), + SYNC_WARMUP("sync_warmup"), + PEER_READ_ASYNC_WARMUP("peer_read_async_warmup"); + + private final String value; + + BalanceTypeEnum(String value) { + this.value = value; + } + + /** + * Parse string value to enum, case-insensitive + */ + public static BalanceTypeEnum fromString(String value) { + if (value == null) { + return null; + } + for (BalanceTypeEnum type : BalanceTypeEnum.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + return null; + } + + /** + * Check if the given string is a valid balance type + */ + public static boolean isValid(String value) { + return fromString(value) != null; + } + + /** + * Get the balance type enum from the configuration string + */ + public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() { + return fromString(Config.cloud_warm_up_for_rebalance_type) == null + ? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM : fromString(Config.cloud_warm_up_for_rebalance_type); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java index 044f24d2242005..40feb8b787bd26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java @@ -99,10 +99,63 @@ private void processVirtualClusters(List clusters) { List virtualClusters = new ArrayList<>(); List computeClusters = new ArrayList<>(); categorizeClusters(clusters, virtualClusters, computeClusters); + handleComputeClusters(computeClusters); handleVirtualClusters(virtualClusters, computeClusters); removeObsoleteVirtualGroups(virtualClusters); } + private void handleComputeClusters(List computeClusters) { + for (Cloud.ClusterPB computeClusterInMs : computeClusters) { + ComputeGroup computeGroupInFe = cloudSystemInfoService + .getComputeGroupById(computeClusterInMs.getClusterId()); + if (computeGroupInFe == null) { + // cluster checker will sync it + LOG.info("found compute cluster {} in ms, but not in fe mem, " + + "it may be wait cluster checker to sync, ignore it", + computeClusterInMs); + } else { + // exist compute group, check properties changed and update if needed + updatePropertiesIfChanged(computeGroupInFe, computeClusterInMs); + } + } + } + + /** + * Compare properties between compute cluster in MS and compute group in FE, + * update only the changed key-value pairs to avoid unnecessary updates. + */ + private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe, Cloud.ClusterPB computeClusterInMs) { + Map propertiesInMs = computeClusterInMs.getPropertiesMap(); + Map propertiesInFe = computeGroupInFe.getProperties(); + + if (propertiesInMs == null || propertiesInMs.isEmpty()) { + return; + } + Map changedProperties = new HashMap<>(); + + // Check for changed or new properties + for (Map.Entry entry : propertiesInMs.entrySet()) { + String key = entry.getKey(); + String valueInMs = entry.getValue(); + String valueInFe = propertiesInFe.get(key); + + if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) { + continue; + } + changedProperties.put(key, valueInMs); + + LOG.debug("Property changed for compute group {}: {} = {} (was: {})", + computeGroupInFe.getName(), key, valueInMs, valueInFe); + } + + // Only update if there are actual changes + if (!changedProperties.isEmpty()) { + LOG.info("Updating properties for compute group {}: {}", + computeGroupInFe.getName(), changedProperties); + computeGroupInFe.setProperties(changedProperties); + } + } + private void categorizeClusters(List clusters, List virtualClusters, List computeClusters) { for (Cloud.ClusterPB cluster : clusters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index d8d6b712b2ea45..a33667f0f64ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -112,6 +112,54 @@ public class CloudTabletRebalancer extends MasterDaemon { private CloudSystemInfoService cloudSystemInfoService; + private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum(); + + /** + * Get the current balance type for a compute group, falling back to global balance type if not found + */ + private BalanceTypeEnum getCurrentBalanceType(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + LOG.debug("compute group not found, use global balance type, id {}", clusterId); + return globalBalanceTypeEnum; + } + + BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType(); + if (isComputeGroupBalanceChanged(clusterId)) { + return computeGroupBalanceType; + } + return globalBalanceTypeEnum; + } + + /** + * Get the current task timeout for a compute group, falling back to global timeout if not found + */ + private int getCurrentTaskTimeout(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + return Config.cloud_pre_heating_time_limit_sec; + } + + int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout(); + if (isComputeGroupBalanceChanged(clusterId)) { + return computeGroupTimeout; + } + + return Config.cloud_pre_heating_time_limit_sec; + } + + private boolean isComputeGroupBalanceChanged(String clusterId) { + ComputeGroup cg = cloudSystemInfoService.getComputeGroupById(clusterId); + if (cg == null) { + return false; + } + + BalanceTypeEnum computeGroupBalanceType = cg.getBalanceType(); + int computeGroupTimeout = cg.getBalanceWarmUpTaskTimeout(); + return computeGroupBalanceType != ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM + || computeGroupTimeout != ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT; + } + public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) { super("cloud tablet rebalancer", Config.cloud_tablet_rebalancer_interval_second * 1000); this.cloudSystemInfoService = cloudSystemInfoService; @@ -239,6 +287,7 @@ protected void runAfterCatalogReady() { LOG.info("cloud tablet rebalance begin"); long start = System.currentTimeMillis(); + globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum(); buildClusterToBackendMap(); if (!completeRouteInfo()) { @@ -417,10 +466,25 @@ public void globalBalance() { public void checkInflightWarmUpCacheAsync() { Map> beToInfightTasks = new HashMap>(); + Set invalidTasks = new HashSet<>(); for (Map.Entry entry : tabletToInfightTask.entrySet()) { + String clusterId = entry.getKey().getClusterId(); + BalanceTypeEnum balanceTypeEnum = getCurrentBalanceType(clusterId); + if (balanceTypeEnum == BalanceTypeEnum.WITHOUT_WARMUP) { + // no need check warmup cache async + invalidTasks.add(entry.getKey()); + continue; + } beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } + invalidTasks.forEach(key -> { + if (LOG.isDebugEnabled()) { + LOG.debug("remove inflight warmup task tablet {} cluster {} no need warmup", + key.getTabletId(), key.getClusterId()); + } + tabletToInfightTask.remove(key); + }); List infos = new ArrayList<>(); long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; @@ -432,6 +496,7 @@ public void checkInflightWarmUpCacheAsync() { destBackend = null; } if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) { + // dest backend not exist or dead too long, need remove all inflight tasks in this dest backend List toRemove = new LinkedList<>(); for (InfightTask task : entry.getValue()) { for (InfightTablet key : tabletToInfightTask.keySet()) { @@ -447,10 +512,12 @@ public void checkInflightWarmUpCacheAsync() { continue; } if (!destBackend.isAlive()) { + // dest backend dead, dead time smaller than rehash_tablet_after_be_dead_seconds, wait next time continue; } List tablets = entry.getValue().stream() .map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); + // check dest backend whether warmup cache done Map taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey()); if (taskDone == null) { LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}", @@ -461,17 +528,7 @@ public void checkInflightWarmUpCacheAsync() { for (Map.Entry result : taskDone.entrySet()) { InfightTask task = tabletToInfightTask .getOrDefault(new InfightTablet(result.getKey(), clusterId), null); - if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp - > Config.cloud_pre_heating_time_limit_sec)) { - if (!result.getValue()) { - LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); - } - updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); - if (LOG.isDebugEnabled()) { - LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId()); - } - tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); - } + handleWarmupCompletion(task, clusterId, result.getValue(), result.getKey(), infos); } } long oldSize = infos.size(); @@ -855,6 +912,70 @@ private Map sendCheckWarmUpCacheAsyncRpc(List tabletIds, lo return null; } + private void handleWarmupCompletion(InfightTask task, String clusterId, boolean isDone, long tabletId, + List infos) { + if (task == null) { + LOG.warn("cannot find inflight task for tablet {}-{}", clusterId, tabletId); + return; + } + boolean shouldUpdateMapping = false; + BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId); + LOG.debug("cluster id {}, balance type {}, tabletId {}, ", clusterId, currentBalanceType, tabletId); + + switch (currentBalanceType) { + case ASYNC_WARMUP: { + int currentTaskTimeout = getCurrentTaskTimeout(clusterId); + boolean timeExceeded = System.currentTimeMillis() / 1000 - task.startTimestamp > currentTaskTimeout; + LOG.debug("tablet {}-{} warmup cache isDone {} timeExceeded {}", + clusterId, tabletId, isDone, timeExceeded); + if (isDone || timeExceeded) { + if (!isDone) { + // timeout but not done, not normal, info log + LOG.info("{}-{} warmup cache timeout {}, forced to change the mapping", + clusterId, tabletId, currentTaskTimeout); + } else { + // done, normal + LOG.debug("{}-{} warmup cache done, change the mapping", clusterId, tabletId); + } + shouldUpdateMapping = true; + } + break; + } + case SYNC_WARMUP: { + if (isDone) { + // done, normal + LOG.debug("{} sync cache done, change the mapping", tabletId); + shouldUpdateMapping = true; + } + break; + } + default: + break; + } + + if (!shouldUpdateMapping) { + return; + } + + updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); + + if (LOG.isDebugEnabled()) { + LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId()); + } + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); + + if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) { + try { + // send sync cache rpc again, ignore the result, the best effort to sync some new data + sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe); + } catch (Exception e) { + LOG.warn("Failed to preheat tablet {} from {} to {}, " + + "help msg turn off fe config enable_cloud_warm_up_for_rebalance", + task.pickedTablet.getId(), task.srcBe, task.destBe, e); + } + } + } + private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Map> globalBeToTablets, Map>> beToTabletsInTable, @@ -1050,6 +1171,10 @@ private void balanceImpl(List bes, String clusterId, Map long avgNum = totalTabletsNum / beNum; long transferNum = calculateTransferNum(avgNum); + BalanceTypeEnum currentBalanceType = getCurrentBalanceType(clusterId); + LOG.debug("balance type {}, be num {}, total tablets num {}, avg num {}, transfer num {}", + currentBalanceType, beNum, totalTabletsNum, avgNum, transferNum); + for (int i = 0; i < transferNum; i++) { TransferPairInfo pairInfo = new TransferPairInfo(); if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) { @@ -1069,17 +1194,34 @@ private void balanceImpl(List bes, String clusterId, Map CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe); - if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != null && srcBackend.isAlive()) { - if (isConflict(srcBe, destBe, cloudReplica, balanceType, - futurePartitionToTablets, futureBeToTabletsInTable)) { + if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType) + || BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) + && srcBackend != null && srcBackend.isAlive()) { + // direct switch, update fe meta directly, not send preheating task + if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { continue; } - preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, beToTablets); + transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos); + if (BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) { + LOG.debug("directly switch {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, + clusterId); + // send sync cache rpc, best effort + try { + sendPreHeatingRpc(pickedTablet, srcBe, destBe); + } catch (Exception e) { + LOG.debug("Failed to preheat tablet {} from {} to {}, " + + "directly policy, just ignore the error", + pickedTablet.getId(), srcBe, destBe, e); + return; + } + } } else { - if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { + // cache warm up + if (isConflict(srcBe, destBe, cloudReplica, balanceType, + futurePartitionToTablets, futureBeToTabletsInTable)) { continue; } - transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos); + preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, beToTablets); } } } @@ -1147,7 +1289,8 @@ private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, BalanceType balanceType, List infos) { - LOG.info("transfer {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + LOG.info("transfer {} from {} to {}, cluster {}, type {}", + pickedTablet.getId(), srcBe, destBe, clusterId, balanceType); updateBeToTablets(pickedTablet, srcBe, destBe, beToTabletsGlobal, beToTabletsInTable, partitionToTablets); updateBeToTablets(pickedTablet, srcBe, destBe, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java index 82c24afd7c5d75..c895f13f7a03e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/ComputeGroup.java @@ -18,7 +18,11 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.gson.Gson; import lombok.Getter; import lombok.Setter; @@ -33,6 +37,27 @@ public class ComputeGroup { private static final Logger LOG = LogManager.getLogger(ComputeGroup.class); + public static final String BALANCE_TYPE = "balance_type"; + + public static final String BALANCE_WARM_UP_TASK_TIMEOUT = "balance_warm_up_task_timeout"; + + private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() + .add(BALANCE_TYPE).add(BALANCE_WARM_UP_TASK_TIMEOUT).build(); + + private static final Map ALL_PROPERTIES_DEFAULT_VALUE_MAP = Maps.newHashMap(); + + public static final int DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT = Config.cloud_pre_heating_time_limit_sec; + public static final BalanceTypeEnum DEFAULT_COMPUTE_GROUP_BALANCE_ENUM + = BalanceTypeEnum.fromString(Config.cloud_warm_up_for_rebalance_type); + + static { + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_TYPE, DEFAULT_COMPUTE_GROUP_BALANCE_ENUM.getValue()); + if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(Config.cloud_warm_up_for_rebalance_type)) { + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + } + } + private enum PolicyTypeEnum { ActiveStandby, } @@ -110,6 +135,10 @@ public Cloud.ClusterPolicy toPb() { @Setter private boolean needRebuildFileCache = false; + @Getter + @Setter + private Map properties = new LinkedHashMap<>(ALL_PROPERTIES_DEFAULT_VALUE_MAP); + public ComputeGroup(String id, String name, ComputeTypeEnum type) { this.id = id; this.name = name; @@ -134,6 +163,129 @@ public String getStandbyComputeGroup() { return policy.getStandbyComputeGroup(); } + private void validateTimeoutRestriction(Map inputProperties) throws DdlException { + if (!properties.containsKey(BALANCE_TYPE)) { + return; + } + String originalBalanceType = properties.get(BALANCE_TYPE); + if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(originalBalanceType)) { + return; + } + + if (inputProperties.containsKey(BALANCE_TYPE) + && BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(inputProperties.get(BALANCE_TYPE))) { + return; + } + + if (inputProperties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + + " cannot be set when current " + BALANCE_TYPE + " is " + originalBalanceType + + ". Only async_warmup type supports timeout setting."); + } + } + + /** + * Validate a single property key-value pair. + */ + private static void validateProperty(String key, String value) throws DdlException { + if (value == null || value.isEmpty()) { + return; + } + + if (!ALL_PROPERTIES_NAME.contains(key)) { + throw new DdlException("Property " + key + " is not supported"); + } + + // Validate specific properties + if (BALANCE_TYPE.equals(key)) { + if (!BalanceTypeEnum.isValid(value)) { + throw new DdlException("Property " + BALANCE_TYPE + + " only support without_warmup or async_warmup or sync_warmup"); + } + } else if (BALANCE_WARM_UP_TASK_TIMEOUT.equals(key)) { + try { + int timeout = Integer.parseInt(value); + if (timeout <= 0) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer"); + } + } catch (NumberFormatException e) { + throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer"); + } + } + } + + public void checkProperties(Map inputProperties) throws DdlException { + if (inputProperties == null || inputProperties.isEmpty()) { + return; + } + + for (Map.Entry entry : inputProperties.entrySet()) { + validateProperty(entry.getKey(), entry.getValue()); + } + + validateTimeoutRestriction(inputProperties); + } + + public void modifyProperties(Map inputProperties) throws DdlException { + String balanceType = inputProperties.get(BALANCE_TYPE); + if (balanceType == null) { + return; + } + if (BalanceTypeEnum.WITHOUT_WARMUP.getValue().equals(balanceType) + || BalanceTypeEnum.SYNC_WARMUP.getValue().equals(balanceType) + || BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue().equals(balanceType)) { + // delete BALANCE_WARM_UP_TASK_TIMEOUT if exists + properties.remove(BALANCE_WARM_UP_TASK_TIMEOUT); + } else if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(balanceType)) { + // if BALANCE_WARM_UP_TASK_TIMEOUT exists, it has been validated in validateProperty + if (!properties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) { + properties.put(BALANCE_WARM_UP_TASK_TIMEOUT, String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + } + } + } + + // set properties, just set in periodic instance status checker + public void setProperties(Map propertiesInMs) { + if (propertiesInMs == null || propertiesInMs.isEmpty()) { + return; + } + + for (Map.Entry entry : propertiesInMs.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + try { + validateProperty(key, value); + } catch (DdlException e) { + LOG.warn("ignore invalid property. compute group: {}, key: {}, value: {}, error: {}", + name, key, value, e.getMessage()); + continue; + } + + if (value != null && !value.isEmpty()) { + properties.put(key, value); + } + } + } + + public BalanceTypeEnum getBalanceType() { + String balanceType = properties.get(BALANCE_TYPE); + BalanceTypeEnum type = BalanceTypeEnum.fromString(balanceType); + if (type == null) { + return BalanceTypeEnum.ASYNC_WARMUP; + } + return type; + } + + public int getBalanceWarmUpTaskTimeout() { + String timeoutStr = properties.get(BALANCE_WARM_UP_TASK_TIMEOUT); + try { + return Integer.parseInt(timeoutStr); + } catch (NumberFormatException e) { + return DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT; + } + } + @Override public String toString() { Map showMap = new LinkedHashMap<>(); @@ -143,6 +295,7 @@ public String toString() { showMap.put("unavailableSince", String.valueOf(unavailableSince)); showMap.put("availableSince", String.valueOf(availableSince)); showMap.put("policy", policy == null ? "no_policy" : policy.toString()); + showMap.put("properties", properties.toString()); Gson gson = new Gson(); return gson.toJson(showMap); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 62ab6e3b9ec86a..0bbfe683154b46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -1658,4 +1658,48 @@ public void renameComputeGroup(String originalName, String newGroupName) throws LOG.info("alter rename compute group, request: {}, response: {}", request, response); } } + + public void alterComputeGroupProperties(String computeGroupName, Map properties) + throws UserException { + String cloudInstanceId = ((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId(); + if (Strings.isNullOrEmpty(cloudInstanceId)) { + throw new DdlException("unable to alter compute group properties due to empty cloud_instance_id"); + } + String computeGroupId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(computeGroupName); + if (Strings.isNullOrEmpty(computeGroupId)) { + LOG.info("alter compute group properties {} not found, unable to alter", computeGroupName); + throw new DdlException("compute group '" + computeGroupName + "' not found, unable to alter properties"); + } + + ClusterPB clusterPB = ClusterPB.newBuilder() + .setClusterId(computeGroupId) + .setClusterName(computeGroupName) + .setType(ClusterPB.Type.COMPUTE) + .putAllProperties(properties) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId()) + .setOp(Cloud.AlterClusterRequest.Operation.ALTER_PROPERTIES) + .setCluster(clusterPB) + .build(); + + + Cloud.AlterClusterResponse response = null; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter compute group properties not ok, response: {}", response); + throw new UserException("failed to alter compute group properties errorCode: " + + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg() + " may be you can try later"); + } + } catch (RpcException e) { + LOG.warn("alter compute group properties rpc exception"); + throw new UserException("failed to alter compute group properties", e); + } finally { + LOG.info("alter compute group properties, request: {}, response: {}", request, response); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 3e279a96303eea..520f8f97a2bc1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -98,6 +98,7 @@ import org.apache.doris.nereids.DorisParser.AlterCatalogCommentContext; import org.apache.doris.nereids.DorisParser.AlterCatalogPropertiesContext; import org.apache.doris.nereids.DorisParser.AlterCatalogRenameContext; +import org.apache.doris.nereids.DorisParser.AlterComputeGroupContext; import org.apache.doris.nereids.DorisParser.AlterDatabasePropertiesContext; import org.apache.doris.nereids.DorisParser.AlterDatabaseRenameContext; import org.apache.doris.nereids.DorisParser.AlterDatabaseSetQuotaContext; @@ -620,6 +621,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; @@ -6323,6 +6325,13 @@ public LogicalPlan visitAlterWorkloadGroup(AlterWorkloadGroupContext ctx) { return new AlterWorkloadGroupCommand(cgName, stripQuotes(ctx.name.getText()), properties); } + @Override + public LogicalPlan visitAlterComputeGroup(AlterComputeGroupContext ctx) { + Map properties = ctx.propertyClause() != null + ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap(); + return new AlterComputeGroupCommand(ctx.name.getText(), properties); + } + @Override public LogicalPlan visitAlterWorkloadPolicy(AlterWorkloadPolicyContext ctx) { Map properties = ctx.propertyClause() != null diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 6fc59b0f584a86..f50bd047f3acca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -206,6 +206,7 @@ public enum PlanType { ALTER_STORAGE_POLICY_COMMAND, ALTER_STORAGE_VAULT, ALTER_WORKLOAD_GROUP_COMMAND, + ALTER_COMPUTE_GROUP_COMMAND, ALTER_WORKLOAD_POLICY_COMMAND, ALTER_DATABASE_RENAME_COMMAND, ALTER_DATABASE_SET_DATA_QUOTA_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java new file mode 100644 index 00000000000000..8a295678f4826e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommand.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.ComputeGroup; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +//import org.apache.commons.lang3.StringUtils; +import java.util.Map; + +/** + * alter compute group command + */ +public class AlterComputeGroupCommand extends AlterCommand { + private final String computeGroupName; + private final Map properties; + + /** + * constructor + */ + public AlterComputeGroupCommand(String computeGroupName, Map properties) { + super(PlanType.ALTER_COMPUTE_GROUP_COMMAND); + this.computeGroupName = computeGroupName; + this.properties = properties; + } + + /** + * validate + */ + public void validate(ConnectContext connectContext) throws UserException { + if (Config.isNotCloudMode()) { + throw new AnalysisException("Currently, Alter compute group is only supported in cloud mode"); + } + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + if (properties == null || properties.isEmpty()) { + throw new AnalysisException("Compute Group properties can't be empty"); + } + + CloudSystemInfoService cloudSys = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); + // check compute group exist + ComputeGroup cg = cloudSys.getComputeGroupByName(computeGroupName); + if (cg == null) { + throw new AnalysisException("Compute Group " + computeGroupName + " does not exist"); + } + + if (cg.isVirtual()) { + throw new AnalysisException("Virtual Compute Group " + computeGroupName + " can not be altered"); + } + + // check compute group's properties can be modified + cg.checkProperties(properties); + + cg.modifyProperties(properties); + } + + @Override + public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + validate(ctx); + CloudSystemInfoService cloudSys = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); + // send rpc to ms + cloudSys.alterComputeGroupProperties(computeGroupName, properties); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAlterComputeGroupCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java index da61a861ce6147..7ea77b085a856d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowClustersCommand.java @@ -59,10 +59,12 @@ public class ShowClustersCommand extends ShowCommand { // sql: show clusters; public static final ImmutableList CLUSTER_TITLE_NAMES = new ImmutableList.Builder() - .add("cluster").add("is_current").add("users").add("backend_num").add("sub_clusters").add("policy").build(); + .add("cluster").add("is_current").add("users").add("backend_num") + .add("sub_clusters").add("policy").add("properties").build(); // sql: show compute groups; public static final ImmutableList COMPUTE_GROUP_TITLE_NAMES = new ImmutableList.Builder() - .add("Name").add("IsCurrent").add("Users").add("BackendNum").add("SubComputeGroups").add("Policy").build(); + .add("Name").add("IsCurrent").add("Users").add("BackendNum") + .add("SubComputeGroups").add("Policy").add("Properties").build(); private static final Logger LOG = LogManager.getLogger(ShowClustersCommand.class); private final boolean isComputeGroup; @@ -105,13 +107,17 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc clusterNameSet.addAll(clusterNames); for (String clusterName : clusterNameSet) { - ArrayList row = Lists.newArrayList(clusterName); // current_used, users if (!Env.getCurrentEnv().getAccessManager() .checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { continue; } + ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName); + if (cg == null) { + continue; + } + ArrayList row = Lists.newArrayList(clusterName); String clusterNameFromCtx = ""; try { clusterNameFromCtx = ctx.getCloudCluster(); @@ -142,16 +148,14 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc rows.add(row); row.add(subClusterNames); row.add(policy); + row.add(cg.getProperties().toString()); continue; } // virtual compute group // virtual cg backends eq 0 row.add(String.valueOf(0)); rows.add(row); - ComputeGroup cg = cloudSys.getComputeGroupByName(clusterName); - if (cg == null) { - continue; - } + String activeCluster = cg.getPolicy().getActiveComputeGroup(); String standbyCluster = cg.getPolicy().getStandbyComputeGroup(); // first active, second standby @@ -159,6 +163,7 @@ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exc row.add(subClusterNames); // Policy row.add(cg.getPolicy().toString()); + row.add(cg.getProperties().toString()); } return new ShowResultSet(getMetaData(), rows); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 3a8ff435ead1ff..b663110a5caf0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterComputeGroupCommand; import org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand; @@ -808,6 +809,10 @@ default R visitAlterWorkloadGroupCommand(AlterWorkloadGroupCommand alterWorkload return visitCommand(alterWorkloadGroupCommand, context); } + default R visitAlterComputeGroupCommand(AlterComputeGroupCommand alterComputeGroupCommand, C context) { + return visitCommand(alterComputeGroupCommand, context); + } + default R visitAlterWorkloadPolicyCommand(AlterWorkloadPolicyCommand alterWorkloadPolicyCommand, C context) { return visitCommand(alterWorkloadPolicyCommand, context); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java new file mode 100644 index 00000000000000..c61f2abefa42fc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/ComputeGroupTest.java @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.common.DdlException; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class ComputeGroupTest { + private ComputeGroup computeGroup; + + @BeforeEach + public void setUp() { + computeGroup = new ComputeGroup("test_id", "test_group", ComputeGroup.ComputeTypeEnum.COMPUTE); + } + + @Test + public void testCheckPropertiesWithNull() throws DdlException { + computeGroup.checkProperties(null); + computeGroup.checkProperties(Maps.newHashMap()); + } + + @Test + public void testCheckPropertiesWithValidBalanceType() throws DdlException { + // 测试有效的balance_type + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(properties); + } + + @Test + public void testCheckPropertiesWithInvalidBalanceType() { + // 测试无效的balance_type + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, "invalid_type"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testCheckPropertiesWithValidTimeout() throws DdlException { + // 测试有效的timeout + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "300"); + computeGroup.checkProperties(properties); + + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "1"); + computeGroup.checkProperties(properties); + } + + @Test + public void testCheckPropertiesWithInvalidTimeout() { + // 测试无效的timeout + Map properties = Maps.newHashMap(); + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "-1"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + + properties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "invalid"); + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testCheckPropertiesWithUnsupportedProperty() { + // 测试不支持的属性 + Map properties = Maps.newHashMap(); + properties.put("unsupported_property", "value"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(properties); + }); + } + + @Test + public void testModifyPropertiesWithDirectSwitch() throws DdlException { + // 测试without_warmup类型,应该删除timeout + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout被删除 + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithSyncCache() throws DdlException { + // 测试sync_cache类型,应该删除timeout + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, + String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout被删除 + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testCheckPropertiesWithBalanceTypeTransition() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testCheckPropertiesWithWarmupCacheToWarmupCache() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testCheckPropertiesWithDirectSwitchToDirectSwitch() throws DdlException { + // 测试从direct_switch转换到direct_switch,不需要设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testModifyPropertiesWithWarmupCacheAndExistingTimeout() throws DdlException { + // 测试async_warmup类型,已存在timeout,不应该添加默认值 + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "600"); + + // 先设置timeout到properties中 + computeGroup.getProperties().put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + String originalTimeout = computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT); + + computeGroup.modifyProperties(inputProperties); + + // 验证timeout没有被修改, 这里的意思是用户已经设置过timeout了,就不应该被覆盖 + Assertions.assertEquals(originalTimeout, computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithWarmupCacheAndNoTimeout() throws DdlException { + // 测试async_warmup类型,不存在timeout,应该添加默认值 + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + + // 确保properties中没有timeout + computeGroup.getProperties().remove(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT); + Assertions.assertFalse(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + + computeGroup.modifyProperties(inputProperties); + // 验证默认值被添加 + Assertions.assertTrue(computeGroup.getProperties().containsKey(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + Assertions.assertEquals(String.valueOf(ComputeGroup.DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT), + computeGroup.getProperties().get(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT)); + } + + @Test + public void testModifyPropertiesWithNullBalanceType() throws DdlException { + // 测试null balance_type,不应该修改properties + Map inputProperties = Maps.newHashMap(); + inputProperties.put("other_property", "value"); + + Map originalProperties = Maps.newHashMap(computeGroup.getProperties()); + + computeGroup.modifyProperties(inputProperties); + + // 验证properties没有被修改 + Assertions.assertEquals(originalProperties, computeGroup.getProperties()); + } + + @Test + public void testModifyPropertiesWithEmptyInput() throws DdlException { + // 测试空输入,不应该修改properties + Map inputProperties = Maps.newHashMap(); + + Map originalProperties = Maps.newHashMap(computeGroup.getProperties()); + + computeGroup.modifyProperties(inputProperties); + + // 验证properties没有被修改 + Assertions.assertEquals(originalProperties, computeGroup.getProperties()); + } + + @Test + public void testCheckPropertiesWithSyncCacheToWarmupCache() throws DdlException { + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithNoCurrentBalanceType() throws DdlException { + // 测试当前没有设置balance_type的情况 + computeGroup.getProperties().remove(ComputeGroup.BALANCE_TYPE); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithCurrentWarmupCache() throws DdlException { + // 测试当前balance_type是warmup_cache的情况 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToWarmupCache() throws DdlException { + // 测试从direct_switch转换到warmup_cache并设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithSyncCacheToWarmupCacheWithTimeout() throws DdlException { + // 测试从sync_cache转换到warmup_cache并设置timeout + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.ASYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + computeGroup.checkProperties(inputProperties); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchAndOnlyTimeout() { + // 测试当前是direct_switch,仅设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithSyncCacheAndOnlyTimeout() { + // 测试当前是sync_cache,仅设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToSyncCacheAndTimeout() { + // 测试从direct_switch转换到sync_cache并设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.SYNC_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithDirectSwitchToSameAndTimeout() { + // 测试从direct_switch转换到direct_switch并设置timeout应该失败 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + inputProperties.put(ComputeGroup.BALANCE_WARM_UP_TASK_TIMEOUT, "500"); + + Assertions.assertThrows(DdlException.class, () -> { + computeGroup.checkProperties(inputProperties); + }); + } + + @Test + public void testValidateTimeoutRestrictionWithNoInputTimeout() throws DdlException { + // 测试输入中没有timeout的情况 + computeGroup.getProperties().put(ComputeGroup.BALANCE_TYPE, BalanceTypeEnum.WITHOUT_WARMUP.getValue()); + Map inputProperties = Maps.newHashMap(); + inputProperties.put("other_property", "value"); + Assertions.assertThrows(DdlException.class, () -> { + // Property other_property is not supported + computeGroup.checkProperties(inputProperties); + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java new file mode 100644 index 00000000000000..8ff8b60ffe05b6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterComputeGroupCommandTest.java @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.backup.CatalogMocker; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.ComputeGroup; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class AlterComputeGroupCommandTest { + private static final String internalCtl = InternalCatalog.INTERNAL_CATALOG_NAME; + @Mocked + private Env env; + @Mocked + private AccessControllerManager accessControllerManager; + @Mocked + private InternalCatalog catalog; + @Mocked + private ConnectContext connectContext; + @Mocked + private CloudSystemInfoService cloudSystemInfoService; + @Mocked + private ComputeGroup computeGroup; + private Database db; + + private void runBefore() throws Exception { + db = CatalogMocker.mockDb(); + new Expectations() { + { + Env.getCurrentEnv(); + minTimes = 0; + result = env; + + env.getAccessManager(); + minTimes = 0; + result = accessControllerManager; + + ConnectContext.get(); + minTimes = 0; + result = connectContext; + + connectContext.isSkipAuth(); + minTimes = 0; + result = true; + + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = true; + } + }; + } + + @Test + public void testValidateNonCloudMode() throws Exception { + runBefore(); + Config.deploy_mode = "non-cloud"; + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateAuthFailed() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + new Expectations() { + { + accessControllerManager.checkGlobalPriv(connectContext, PrivPredicate.ADMIN); + minTimes = 0; + result = false; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateEmptyProperties() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", null); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + + AlterComputeGroupCommand command2 = new AlterComputeGroupCommand("test_group", new HashMap<>()); + Assertions.assertThrows(UserException.class, () -> command2.validate(connectContext)); + } + + @Test + public void testValidateComputeGroupNotExist() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("non_exist_group"); + minTimes = 0; + result = null; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("non_exist_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateVirtualComputeGroup() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("virtual_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = true; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("virtual_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateInvalidProperties() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("test_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = false; + + computeGroup.checkProperties((Map) any); + minTimes = 0; + result = new DdlException("Invalid property"); + } + }; + + Map properties = Maps.newHashMap(); + properties.put("invalid_property", "invalid_value"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertThrows(UserException.class, () -> command.validate(connectContext)); + } + + @Test + public void testValidateSuccess() throws Exception { + runBefore(); + Config.deploy_mode = "cloud"; + + new Expectations() { + { + env.getCurrentSystemInfo(); + minTimes = 0; + result = cloudSystemInfoService; + + cloudSystemInfoService.getComputeGroupByName("test_group"); + minTimes = 0; + result = computeGroup; + + computeGroup.isVirtual(); + minTimes = 0; + result = false; + } + }; + + new Expectations() { + { + computeGroup.checkProperties((Map) any); + minTimes = 0; + + computeGroup.modifyProperties((Map) any); + minTimes = 0; + } + }; + + Map properties = Maps.newHashMap(); + properties.put("balance_type", "async_warmup"); + properties.put("balance_warm_up_task_timeout", "300"); + AlterComputeGroupCommand command = new AlterComputeGroupCommand("test_group", properties); + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java new file mode 100644 index 00000000000000..06588301872693 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowComputeGroupTest.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class ShowComputeGroupTest extends TestWithFeService { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + } + + @Test + public void testShowComputeGroupsInCloudMode() throws Exception { + Config.deploy_mode = "cloud"; + ShowClustersCommand command = new ShowClustersCommand(true); + ShowResultSetMetaData metaData = command.getMetaData(); + Assertions.assertNotNull(metaData); + List columnNames = metaData.getColumns().stream() + .map(Column::getName) + .collect(Collectors.toList()); + Assertions.assertEquals(7, columnNames.size()); + Assertions.assertEquals("Name", columnNames.get(0)); + Assertions.assertEquals("IsCurrent", columnNames.get(1)); + Assertions.assertEquals("Users", columnNames.get(2)); + Assertions.assertEquals("BackendNum", columnNames.get(3)); + Assertions.assertEquals("SubComputeGroups", columnNames.get(4)); + Assertions.assertEquals("Policy", columnNames.get(5)); + Assertions.assertEquals("Properties", columnNames.get(6)); + } + + @Test + public void testShowComputeGroupsInNonCloudMode() throws Exception { + Config.deploy_mode = "not-cloud"; + ShowClustersCommand command = new ShowClustersCommand(true); + Assertions.assertThrows(AnalysisException.class, () -> { + command.doRun(connectContext, null); + }); + } + + @Test + public void testShowClustersInCloudMode() throws Exception { + ShowClustersCommand command = new ShowClustersCommand(false); + ShowResultSetMetaData metaData = command.getMetaData(); + Assertions.assertNotNull(metaData); + List columnNames = metaData.getColumns().stream() + .map(Column::getName).collect(Collectors.toList()); + Assertions.assertEquals(7, columnNames.size()); + Assertions.assertEquals("cluster", columnNames.get(0)); + Assertions.assertEquals("is_current", columnNames.get(1)); + Assertions.assertEquals("users", columnNames.get(2)); + Assertions.assertEquals("backend_num", columnNames.get(3)); + Assertions.assertEquals("sub_clusters", columnNames.get(4)); + Assertions.assertEquals("policy", columnNames.get(5)); + Assertions.assertEquals("properties", columnNames.get(6)); + } + + @Test + public void testShowClustersInNonCloudMode() throws Exception { + Config.deploy_mode = "not-cloud"; + ShowClustersCommand command = new ShowClustersCommand(false); + Assertions.assertThrows(AnalysisException.class, () -> { + command.doRun(connectContext, null); + }); + } +} diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 1a73cb905c2601..c380e07d4e88b3 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -182,6 +182,7 @@ message ClusterPB { optional int64 mtime = 11; repeated string cluster_names = 12; // clusters in virtual cluster optional ClusterPolicy cluster_policy = 13; // virtual cluster policy + map properties = 14; // clsuter additional properties } message NodeInfoPB { @@ -1410,6 +1411,7 @@ message AlterClusterRequest { UPDATE_CLUSTER_ENDPOINT = 9; SET_CLUSTER_STATUS = 10; ALTER_VCLUSTER_INFO = 11; + ALTER_PROPERTIES = 12; } optional string instance_id = 1; optional string cloud_unique_id = 2; // For auth diff --git a/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy new file mode 100644 index 00000000000000..e0db0c258e747a --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_alter_compute_group_properties.groovy @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite('test_alter_compute_group_properties', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def findComputeGroup = { clusterName -> + def showComputeGroups = sql_return_maparray """SHOW COMPUTE GROUPS""" + log.info("SHOW COMPUTE GROUPS result: {}", showComputeGroups) + showComputeGroups.find { it.Name == clusterName } + } + + def findCluster = { clusterName -> + def showCg = sql_return_maparray """SHOW CLUSTERS""" + log.info("SHOW CLUSTERS result: {}", showCg) + showCg.find { it.cluster == clusterName } + } + + docker(options) { + String clusterName = "compute_cluster" + def showComputeGroup = findComputeGroup(clusterName) + def showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='without_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup')) + assertTrue(showclusters.properties.contains('balance_type=without_warmup')) + try { + // errCode = 2, detailMessage = Property balance_warm_up_task_timeout cannot be set when current balance_type is without_warmup. Only async_warmup type supports timeout setting. + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_warm_up_task_timeout'='6000')""" + } catch (Exception e) { + logger.info("exception: {}", e.getMessage()) + assertTrue(e.getMessage().contains("Property balance_warm_up_task_timeout cannot be set when current")) + } + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='6000')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=6000')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=6000')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='without_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=without_warmup')) + assertTrue(showclusters.properties.contains('balance_type=without_warmup')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='async_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + assertTrue(showclusters.properties.contains('balance_type=async_warmup, balance_warm_up_task_timeout=300')) + sql """ALTER COMPUTE GROUP $clusterName PROPERTIES ('balance_type'='sync_warmup')""" + sleep(3 * 1000) + showComputeGroup = findComputeGroup(clusterName) + showclusters = findCluster(clusterName) + assertTrue(showComputeGroup.Properties.contains('balance_type=sync_warmup')) + assertTrue(showclusters.properties.contains('balance_type=sync_warmup')) + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy new file mode 100644 index 00000000000000..f96119ec29b4b5 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_use_compute_group_properties', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=30' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def global_config_cluster = "compute_cluster" + def without_warmup_cluster = "without_warmup" + def async_warmup_cluster = "async_warmup" + def sync_warmup_cluster = "sync_warmup" + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + + // alter each cluster different balance type + sql """ALTER COMPUTE GROUP $without_warmup_cluster PROPERTIES ('balance_type'='without_warmup')""" + sql """ALTER COMPUTE GROUP $async_warmup_cluster PROPERTIES ('balance_type'='async_warmup', 'balance_warm_up_task_timeout'='10')""" + sql """ALTER COMPUTE GROUP $sync_warmup_cluster PROPERTIES ('balance_type'='sync_warmup')""" + + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + def beforeBalanceEveryClusterCache = [:] + + def clusterNameToBeIdx = [:] + clusterNameToBeIdx[global_config_cluster] = [1] + clusterNameToBeIdx[without_warmup_cluster] = [2] + clusterNameToBeIdx[async_warmup_cluster] = [3] + clusterNameToBeIdx[sync_warmup_cluster] = [4] + + // generate primary tablet in each cluster + for (clusterName in [global_config_cluster, without_warmup_cluster, async_warmup_cluster, sync_warmup_cluster]) { + sql """ use @$clusterName """ + sql """ select * from $table """ + def beIdxs = clusterNameToBeIdx[clusterName] + def bes = [] + beIdxs.each { beIdx -> + def be = cluster.getBeByIndex(beIdx) + bes << be + } + logger.info("clusterName {} be idxs {}, bes {}", clusterName, beIdxs, bes) + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(bes) + logger.info("before add be fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + def merged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + beforeBalanceEveryClusterCache[clusterName] = [beforeGetFromFe, beforeGetFromBe, merged23CacheDir] + } + logger.info("before balance every cluster cache {}", beforeBalanceEveryClusterCache) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, global_config_cluster) + cluster.addBackend(1, without_warmup_cluster) + cluster.addBackend(1, async_warmup_cluster) + cluster.addBackend(1, sync_warmup_cluster) + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + clusterNameToBeIdx[global_config_cluster] = [1, 5] + clusterNameToBeIdx[without_warmup_cluster] = [2, 6] + clusterNameToBeIdx[async_warmup_cluster] = [3, 7] + clusterNameToBeIdx[sync_warmup_cluster] = [4, 8] + + // sleep 11s, wait balance + // and sync_warmup cluster task 10s timeout + sleep(11 * 1000) + + def afterBalanceEveryClusterCache = [:] + + for (clusterName in [global_config_cluster, without_warmup_cluster, async_warmup_cluster, sync_warmup_cluster]) { + sql """ use @$clusterName """ + sql """ select * from $table """ + def beIdxs = clusterNameToBeIdx[clusterName] + def bes = [] + beIdxs.each { beIdx -> + def be = cluster.getBeByIndex(beIdx) + bes << be + } + logger.info("after add be clusterName {} be idxs {}, bes {}", clusterName, beIdxs, bes) + + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(bes) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", afterCacheDirVersion3) + def merged23CacheDir = [afterCacheDirVersion2, afterCacheDirVersion3] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + afterBalanceEveryClusterCache[clusterName] = [afterGetFromFe, afterGetFromBe, merged23CacheDir] + logger.info("after add be clusterName {} fe tablets {}, be tablets {}, cache dir {}", clusterName, afterGetFromFe, afterGetFromBe, merged23CacheDir) + } + logger.info("after add be balance every cluster cache {}", afterBalanceEveryClusterCache) + + // assert first map keys + def assertFirstMapKeys = { clusterRet, expectedEqual -> + def firstMap = clusterRet[0] + def keys = firstMap.keySet().toList() + if (expectedEqual) { + assert firstMap[keys[0]] == firstMap[keys[1]] + } else { + assert firstMap[keys[0]] != firstMap[keys[1]] + } + } + + // check afterBalanceEveryClusterCache + // fe config cloud_warm_up_for_rebalance_type=sync_warmup + def global_config_cluster_ret = afterBalanceEveryClusterCache[global_config_cluster] + logger.info("global_config_cluster_ret {}", global_config_cluster_ret) + // fe tablets not changed + assertFirstMapKeys(global_config_cluster_ret, true) + + def without_warmup_cluster_ret = afterBalanceEveryClusterCache[without_warmup_cluster] + logger.info("without_warmup_cluster_ret {}", without_warmup_cluster_ret) + // fe tablets has changed + assertFirstMapKeys(without_warmup_cluster_ret, false) + + def async_warmup_cluster_ret = afterBalanceEveryClusterCache[async_warmup_cluster] + logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret) + // fe tablets has changed, due to task timeout + assertFirstMapKeys(async_warmup_cluster_ret, false) + + def sync_warmup_cluster_ret = afterBalanceEveryClusterCache[sync_warmup_cluster] + logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret) + // fe tablets not changed + assertFirstMapKeys(sync_warmup_cluster_ret, true) + + logger.info("success check after balance every cluster cache, cluster's balance type is worked") + } + + docker(options) { + cluster.addBackend(1, without_warmup_cluster) + cluster.addBackend(1, async_warmup_cluster) + cluster.addBackend(1, sync_warmup_cluster) + testCase("test_balance_warm_up_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy index 7a18f22bb31cf5..50476800d75dea 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy @@ -29,7 +29,7 @@ suite('test_balance_warm_up', 'docker') { 'sys_log_verbose_modules=org', 'heartbeat_interval_second=1', 'rehash_tablet_after_be_dead_seconds=3600', - 'enable_cloud_warm_up_for_rebalance=true' + 'cloud_warm_up_for_rebalance_type=async_warmup' ] options.beConfigs += [ 'report_tablet_interval_seconds=1', diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy new file mode 100644 index 00000000000000..19c252fba8598c --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_sync_global_config.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_sync_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=30' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + sleep(5 * 1000) + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, new rs after warm up task + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + def afterMerged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 4 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // check tablet still in old be + def beforeWarmUpTaskOkDis = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up dis result {}", beforeWarmUpTaskOkDis) + assert beforeWarmUpTaskOkDis.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + GetDebugPoint().disableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskOkResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskOkResult) + afterWarmUpTaskOkResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("after cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("after cache dir version 3 {}", afterCacheDirVersion3) + sleep(5 * 1000) + // version 4 + def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("after cache dir version 4 {}", afterCacheDirVersion4) + + def afterMergedCacheDir = [afterCacheDirVersion2, afterCacheDirVersion3, afterCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + } + + docker(options) { + testCase("test_balance_warm_up_sync_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy new file mode 100644 index 00000000000000..0938126d61cf70 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_task_abnormal.groovy @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_task_abnormal', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=sync_warmup', + 'cloud_pre_heating_time_limit_sec=10' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + assert beforeWarmUpResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + // sync warm up task always return false + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + setFeConfig('enable_cloud_multi_replica', false) + + // wait for some time to make sure warm up task is processed, but mapping is not changed + sleep(15 * 1000) + // check mapping is not changed + def afterAddBeResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after add be result {}", afterAddBeResult) + // two be, but 2 replica num is still mapping in old be + assert afterAddBeResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 2 + } + + // test recover from abnormal + sql """ALTER COMPUTE GROUP compute_cluster PROPERTIES ('balance_type'='without_warmup')""" + sleep(5 * 1000) + + def afterAlterResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after alter balance policy result {}", afterAlterResult) + // now mapping is changed to 1 replica in each be + assert afterAlterResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + docker(options) { + testCase("test_balance_warm_up_task_abnormal_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy new file mode 100644 index 00000000000000..8ce7160654264c --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_peer_read_async_warmup', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=peer_read_async_warmup', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader::download_segment_file_sleep", [sleep_time: 50]) + setFeConfig('enable_cloud_multi_replica', false) + awaitUntil(500) { + def afterRebalanceResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after rebalance result {}", afterRebalanceResult) + afterRebalanceResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, in new be, but not in old be + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + + sql """ + insert into $table values (70, '7'), (80, '8') + """ + // version 5, in new be, but not in old be + def beforeCacheDirVersion5 = getTabletFileCacheDirFromBe(msHttpPort, table, 5) + logger.info("cache dir version 5 {}", beforeCacheDirVersion5) + + def afterMerged2345CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4, beforeCacheDirVersion5] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 2,3,4,5 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged2345CacheDir) + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + + def newAddBeCacheDir = afterMerged2345CacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + // version 4, 5 + assertTrue(newAddBeCacheDir.size() == 2, "new add be should have version 4,5 cache file") + // warm up task blocked by debug point, so old be should not have version 4,5 cache file + assertFalse(afterMerged2345CacheDir[oldBe.Host].containsAll(newAddBeCacheDir), "old be should not have version 4,5 cache file") + + // The query triggers reading the file cache from the peer + profile("test_peer_read_async_warmup_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_peer_read_async_warmup_profile */ select * from $table""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("NumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + + // peer read cache, so it should read version 2,3 cache file from old be, not s3 + assertTrue(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read"), "new add be should have peer read cache") + assertTrue(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read"), "new add be should not have s3 read cache") + } + + docker(options) { + testCase("test_peer_read_async_warmup_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy similarity index 98% rename from regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy rename to regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy index aa35a70e121957..c0dc2f9747c60c 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy +++ b/regression-test/suites/cloud_p0/balance/test_warmup_rebalance.groovy @@ -25,7 +25,7 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster, docker') { def options = new ClusterOptions() options.feConfigs += [ 'cloud_cluster_check_interval_second=1', - 'enable_cloud_warm_up_for_rebalance=true', + 'cloud_warm_up_for_rebalance_type=async_warmup', 'cloud_tablet_rebalancer_interval_second=1', 'cloud_balance_tablet_percent_per_run=0.5', 'sys_log_verbose_modules=org',