From 4750923ce62a48b4b056e5691ff58b8395c9b967 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 27 Oct 2025 13:43:52 +0800 Subject: [PATCH] [opt](cloud) Exposes cloud balance metrics (#57200) Exposes cloud balance related metrics to show whether the compute group is performing balance scheduling. When `*_balance_num` metrics are all 0, the current compute group is considered to be in a balanced state. Note: These metrics are valid only when requesting the fe master (balance scheduling is performed on the fe master) ``` curl "http://175.42.1.1:8030/metrics" |rg '_balance_num' # HELP doris_fe_cloud_table_balance_num current cluster cloud table balance sync edit log number # TYPE doris_fe_cloud_table_balance_num counter doris_fe_cloud_table_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 5 doris_fe_cloud_table_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_partition_balance_num current cluster cloud partition balance sync edit log number # TYPE doris_fe_cloud_partition_balance_num counter doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_smooth_upgrade_balance_num current cluster cloud smooth upgrade sync edit log number # TYPE doris_fe_cloud_smooth_upgrade_balance_num counter doris_fe_cloud_smooth_upgrade_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_smooth_upgrade_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_global_balance_num current cluster cloud be balance sync edit log number # TYPE doris_fe_cloud_global_balance_num counter doris_fe_cloud_global_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_global_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 # HELP doris_fe_cloud_warm_up_balance_num current cluster cloud warm up cache sync edit log number # TYPE doris_fe_cloud_warm_up_balance_num counter doris_fe_cloud_warm_up_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"} 0 doris_fe_cloud_warm_up_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"} 0 ``` --- .../cloud/catalog/CloudTabletRebalancer.java | 39 ++++++- .../org/apache/doris/metric/CloudMetrics.java | 26 +++++ .../org/apache/doris/metric/MetricRepo.java | 61 ++++++++++ .../balance/test_balance_metrics.groovy | 110 ++++++++++++++++++ 4 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 1b4a9d6e042ab0..f203148a1b1c78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -37,6 +37,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; @@ -48,6 +49,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncResponse; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Sets; import lombok.Getter; import org.apache.logging.log4j.LogManager; @@ -125,6 +127,14 @@ public enum BalanceType { PARTITION } + public enum StatType { + GLOBAL, + TABLE, + PARTITION, + SMOOTH_UPGRADE, + WARM_UP_CACHE + } + @Getter private class InfightTablet { private final Long tabletId; @@ -320,7 +330,7 @@ public void balanceAllPartitions() { balanceInPartition(entry.getValue(), entry.getKey(), infos); } long oldSize = infos.size(); - infos = batchUpdateCloudReplicaInfoEditlogs(infos); + infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.PARTITION); LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size()); try { Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); @@ -356,7 +366,7 @@ public void balanceAllTables() { balanceInTable(entry.getValue(), entry.getKey(), infos); } long oldSize = infos.size(); - infos = batchUpdateCloudReplicaInfoEditlogs(infos); + infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.TABLE); LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size()); try { Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); @@ -391,7 +401,7 @@ public void globalBalance() { balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos); } long oldSize = infos.size(); - infos = batchUpdateCloudReplicaInfoEditlogs(infos); + infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.GLOBAL); LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size()); try { Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); @@ -472,7 +482,7 @@ public void checkInflightWarmUpCacheAsync() { } } long oldSize = infos.size(); - infos = batchUpdateCloudReplicaInfoEditlogs(infos); + infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.WARM_UP_CACHE); LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size()); try { Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); @@ -1221,7 +1231,7 @@ private void migrateTablets(Long srcBe, Long dstBe) { } } long oldSize = infos.size(); - infos = batchUpdateCloudReplicaInfoEditlogs(infos); + infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.SMOOTH_UPGRADE); LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size()); try { Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); @@ -1239,16 +1249,24 @@ private void migrateTablets(Long srcBe, Long dstBe) { } } - private List batchUpdateCloudReplicaInfoEditlogs(List infos) { + private List batchUpdateCloudReplicaInfoEditlogs(List infos, + StatType type) { long start = System.currentTimeMillis(); List rets = new ArrayList<>(); // clusterId, infos Map> clusterIdToInfos = infos.stream() .collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId)); + Set notBalancedClusterIds = new HashSet<>(this.clusterToBes.keySet()); for (Map.Entry> entry : clusterIdToInfos.entrySet()) { // same cluster String clusterId = entry.getKey(); + notBalancedClusterIds.remove(clusterId); List infoList = entry.getValue(); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getClusterNameByClusterId(clusterId); + if (!Strings.isNullOrEmpty(clusterName)) { + MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, infoList.size()); + } Map> sameLocationInfos = infoList.stream() .collect(Collectors.groupingBy( info -> info.getDbId() @@ -1291,6 +1309,15 @@ private List batchUpdateCloudReplicaInfoEditlogs(List CLUSTER_WARM_UP_JOB_LATEST_START_TIME; protected static AutoMappedMetric CLUSTER_WARM_UP_JOB_LAST_FINISH_TIME; + protected static AutoMappedMetric CLUSTER_CLOUD_PARTITION_BALANCE_NUM; + protected static AutoMappedMetric CLUSTER_CLOUD_TABLE_BALANCE_NUM; + protected static AutoMappedMetric CLUSTER_CLOUD_GLOBAL_BALANCE_NUM; + protected static AutoMappedMetric CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM; + protected static AutoMappedMetric CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM; + protected static void init() { if (Config.isNotCloudMode()) { return; @@ -98,5 +104,25 @@ protected static void init() { CLUSTER_WARM_UP_JOB_FINISHED_TABLETS = new AutoMappedMetric<>( name -> new LongCounterMetric("file_cache_warm_up_job_finished_tablets", MetricUnit.NOUNIT, "warm up job finished tablets")); + + CLUSTER_CLOUD_PARTITION_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric( + "cloud_partition_balance_num", MetricUnit.NOUNIT, + "current cluster cloud partition balance sync edit log number")); + + CLUSTER_CLOUD_TABLE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric( + "cloud_table_balance_num", MetricUnit.NOUNIT, + "current cluster cloud table balance sync edit log number")); + + CLUSTER_CLOUD_GLOBAL_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric( + "cloud_global_balance_num", MetricUnit.NOUNIT, + "current cluster cloud be balance sync edit log number")); + + CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric( + "cloud_smooth_upgrade_balance_num", MetricUnit.NOUNIT, + "current cluster cloud smooth upgrade sync edit log number")); + + CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM = new AutoMappedMetric<>(name -> new LongCounterMetric( + "cloud_warm_up_balance_num", MetricUnit.NOUNIT, + "current cluster cloud warm up cache sync edit log number")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index c77dbf540e6298..6b62874eab8c1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -21,6 +21,7 @@ import org.apache.doris.alter.AlterJobV2.JobType; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.CloudTabletRebalancer; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.InternalErrorCode; @@ -1036,6 +1037,32 @@ public static void registerCloudMetrics(String clusterId, String clusterName) { String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName; CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key); + + LongCounterMetric clusterCloudPartitionBalanceNum = + CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId); + clusterCloudPartitionBalanceNum.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudPartitionBalanceNum); + + LongCounterMetric clusterCloudTableBalanceNum = + CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId); + clusterCloudTableBalanceNum.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudTableBalanceNum); + + LongCounterMetric clusterCloudGlobalBalanceNum = + CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId); + clusterCloudGlobalBalanceNum.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudGlobalBalanceNum); + + LongCounterMetric clusterCloudSmoothUpgradeBalanceNum = + CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId); + clusterCloudSmoothUpgradeBalanceNum.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudSmoothUpgradeBalanceNum); + + LongCounterMetric clusterCloudWarmUpBalanceNum = + CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId); + clusterCloudWarmUpBalanceNum.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(clusterCloudWarmUpBalanceNum); + } public static void increaseClusterRequestAll(String clusterName) { @@ -1249,4 +1276,38 @@ public static void updateClusterQueryLatency(String clusterName, long elapseMs) String key = clusterId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + clusterName; CloudMetrics.CLUSTER_QUERY_LATENCY_HISTO.getOrAdd(key).update(elapseMs); } + + public static void updateClusterCloudBalanceNum(String clusterName, String clusterId, + CloudTabletRebalancer.StatType type, long num) { + if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterName) + || Strings.isNullOrEmpty(clusterId)) { + return; + } + LongCounterMetric counter = null; + switch (type) { + case PARTITION: + counter = CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId); + break; + case TABLE: + counter = CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId); + break; + case GLOBAL: + counter = CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId); + break; + case SMOOTH_UPGRADE: + counter = CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId); + break; + case WARM_UP_CACHE: + counter = CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId); + break; + default: + return; + } + List labels = new ArrayList<>(); + counter.update(num); + labels.add(new MetricLabel("cluster_id", clusterId)); + labels.add(new MetricLabel("cluster_name", clusterName)); + counter.setLabels(labels); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter); + } } diff --git a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy new file mode 100644 index 00000000000000..3a31cd08dc15a8 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_metrics', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=2', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'enable_cloud_warm_up_for_rebalance=false' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def getFEMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/metrics" + logger.info("getFEMetrics1, url: ${url}, name: ${name}") + def metrics = new URL(url).text + def pattern = java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name) + "\\s+(\\d+)") + def matcher = pattern.matcher(metrics) + if (matcher.find()) { + def ret = matcher[0][1] as long + logger.info("getFEMetrics2, ${url}, name:${name}, value:${ret}") + return ret + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def master = cluster.getMasterFe() + def allEditlogNum = 0; + def future = thread { + awaitUntil(300) { + def name = """doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"}""" + def value = getFEMetrics(master.host, master.httpPort, name) + allEditlogNum += value + logger.info("balance metrics value: ${value}, allEditlogNum: ${allEditlogNum}") + return value == 0 && allEditlogNum > 0 + } + } + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 200 + PROPERTIES ( + "replication_num"="1" + ); + """ + // generate some balance tasks + cluster.addBackend(1) + future.get() + // wait for rebalancer to do its job + assertTrue(allEditlogNum > 0, "balance metrics not increased") + + allEditlogNum = 0 + for (i in 0..30) { + sleep(1000) + def name = """doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", cluster_name="compute_cluster"}""" + def value = getFEMetrics(master.host, master.httpPort, name) + allEditlogNum += value + logger.info("Final balance metrics value: ${value}, allEditlogNum: ${allEditlogNum}") + } + // after all balance tasks done, the metric should not increase + assertTrue(allEditlogNum == 0, "final balance metrics not increased") + + cluster.addBackend(1, "other_cluster") + sleep(5000) + def name = """doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id", cluster_name="other_cluster"}""" + def value = getFEMetrics(master.host, master.httpPort, name) + logger.info("other cluster balance metrics value: ${value}") + } + + docker(options) { + testCase("test_balance_metrics_tbl") + } +}