From 179d145547328431bcc66cd2e99971241d7188ee Mon Sep 17 00:00:00 2001 From: yujun Date: Thu, 10 Oct 2024 20:47:31 +0800 Subject: [PATCH 1/5] fix auto buckets calc error --- .../doris/clone/DynamicPartitionScheduler.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index c0a65966fd95f3..1467e8f71e8b91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -170,22 +170,19 @@ private static long getNextPartitionSize(ArrayList historyPartitionsSize) return historyPartitionsSize.get(0); } - int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size(); - boolean isAscending = true; - for (int i = 1; i < size; i++) { - if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) { + List ascendingDeltaSize = List.newArrayList(); + for (int i = math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) { + long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1); + if (delta < 0) { isAscending = false; break; } + ascendingDeltaSize.add(delta); } if (isAscending) { - ArrayList historyDeltaSize = Lists.newArrayList(); - for (int i = 1; i < size; i++) { - historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1)); - } - return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7); + return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7); } else { return ema(historyPartitionsSize, 7); } From 525c6d342d54cac3df7191d6f7b0195c7e698bda Mon Sep 17 00:00:00 2001 From: yujun Date: Thu, 10 Oct 2024 21:08:48 +0800 Subject: [PATCH 2/5] update --- .../org/apache/doris/clone/DynamicPartitionScheduler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 1467e8f71e8b91..37a4b922023bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -171,8 +171,8 @@ private static long getNextPartitionSize(ArrayList historyPartitionsSize) } boolean isAscending = true; - List ascendingDeltaSize = List.newArrayList(); - for (int i = math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) { + ArrayList ascendingDeltaSize = new ArrayList(); + for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) { long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1); if (delta < 0) { isAscending = false; From f43094ac97c9f341986480c807c2493d9ff14658 Mon Sep 17 00:00:00 2001 From: yujun Date: Fri, 11 Oct 2024 17:03:28 +0800 Subject: [PATCH 3/5] add test --- .../catalog/DynamicPartitionTableTest.java | 43 ++++++++++++++++--- .../doris/utframe/TestWithFeService.java | 4 +- .../apache/doris/utframe/UtFrameUtils.java | 4 +- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 79093d6ed4b9ad..02be5bad35b276 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1736,6 +1736,8 @@ public void testAutoBuckets() throws Exception { + " PROPERTIES (\n" + " \"dynamic_partition.enable\" = \"true\",\n" + " \"dynamic_partition.time_unit\" = \"YEAR\",\n" + + " \"dynamic_partition.start\" = \"-50\",\n" + + " \"dynamic_partition.create_history_partition\" = \"true\",\n" + " \"dynamic_partition.end\" = \"1\",\n" + " \"dynamic_partition.prefix\" = \"p\",\n" + " \"replication_allocation\" = \"tag.location.default: 1\"\n" @@ -1744,22 +1746,53 @@ public void testAutoBuckets() throws Exception { Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test"); OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition"); List partitions = Lists.newArrayList(table.getAllPartitions()); - Assert.assertEquals(2, partitions.size()); + Assert.assertEquals(52, partitions.size()); for (Partition partition : partitions) { Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum()); partition.setVisibleVersionAndTime(2L, System.currentTimeMillis()); } RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); - String alterStmt = + String alterStmt1 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; - ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt)); + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1)); List> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId())); Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); partitions = Lists.newArrayList(table.getAllPartitions()); partitions.sort(Comparator.comparing(Partition::getId)); - Assert.assertEquals(3, partitions.size()); - Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum()); + Assert.assertEquals(53, partitions.size()); + Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); + + table.readLock(); + try { + // first 40 partitions with size 0, then 10 partitions with size 20GB + for (int i = 0; i < 50; i++) { + Partition partition = partitions.get(i); + partition.updateVisibleVersion(2L); + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + for (Tablet tablet : idx.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + replica.updateVersion(2L); + replica.setDataSize(i < 40 ? 0L : 20L << 30); + replica.setRowCount(1000L); + } + } + } + } + } finally { + table.readUnlock(); + } + + String alterStmt2 = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '3')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2)); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(54, partitions.size()); + Assert.assertTrue(partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum() > 40); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 407c1544a4b00b..8e25efdfada439 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -517,8 +517,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setPathHash(be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); Map disks = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index d09860351bf93b..22fa581391f738 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -317,8 +317,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); disks.put(diskInfo1.getRootPath(), diskInfo1); From 1f1a9af884d7a26661362c22f232f935826662b9 Mon Sep 17 00:00:00 2001 From: yujun Date: Fri, 11 Oct 2024 17:22:35 +0800 Subject: [PATCH 4/5] add a test --- .../src/main/java/org/apache/doris/common/Config.java | 6 ++++++ .../org/apache/doris/clone/DynamicPartitionScheduler.java | 4 +++- .../org/apache/doris/catalog/DynamicPartitionTableTest.java | 6 +++--- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 861968ffb31392..979c49a51270d4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2533,6 +2533,12 @@ public class Config extends ConfigBase { }) public static long analyze_record_limit = 20000; + @ConfField(mutable = true, masterOnly = true, description = { + "Auto Buckets中预估的压缩数据的倍率", + "the estimated compress factor of partition size in Auto Buckets" + }) + public static int autobucket_compress_size_factor = 5; + @ConfField(mutable = true, masterOnly = true, description = { "Auto Buckets中最小的buckets数目", "min buckets of auto bucket" diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 37a4b922023bfb..20688c57a46866 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -234,7 +234,9 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta .collect(Collectors.toCollection(ArrayList::new)); long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); // plus 5 for uncompressed data - long uncompressedPartitionSize = estimatePartitionSize * 5; + // replica's actual disk usage is a litter bigger then its reported data size + // but 5 times maybe a little too big, i don't known why use so big. just add a config here. + long uncompressedPartitionSize = estimatePartitionSize * Config.autobucket_compress_size_factor; int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 02be5bad35b276..538383e69b8bd5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1771,7 +1771,7 @@ public void testAutoBuckets() throws Exception { Partition partition = partitions.get(i); partition.updateVisibleVersion(2L); for (MaterializedIndex idx : partition.getMaterializedIndices( - MaterializedIndex.IndexExtState.VISIBLE)) { + MaterializedIndex.IndexExtState.VISIBLE)) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { replica.updateVersion(2L); @@ -1779,8 +1779,8 @@ public void testAutoBuckets() throws Exception { replica.setRowCount(1000L); } } - } - } + } + } } finally { table.readUnlock(); } From 92efa8475a4406d63b886872d2b60d88fd7258fd Mon Sep 17 00:00:00 2001 From: yujun Date: Fri, 11 Oct 2024 18:41:20 +0800 Subject: [PATCH 5/5] update --- .../main/java/org/apache/doris/common/Config.java | 6 ------ .../org/apache/doris/clone/BeLoadRebalancer.java | 10 +++++++++- .../doris/clone/DynamicPartitionScheduler.java | 4 +--- .../doris/catalog/DynamicPartitionTableTest.java | 14 ++++++++++---- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 979c49a51270d4..861968ffb31392 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2533,12 +2533,6 @@ public class Config extends ConfigBase { }) public static long analyze_record_limit = 20000; - @ConfField(mutable = true, masterOnly = true, description = { - "Auto Buckets中预估的压缩数据的倍率", - "the estimated compress factor of partition size in Auto Buckets" - }) - public static int autobucket_compress_size_factor = 5; - @ConfField(mutable = true, masterOnly = true, description = { "Auto Buckets中最小的buckets数目", "min buckets of auto bucket" diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 53e8ecf9119a1b..e1460c269c1fe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -113,7 +113,8 @@ protected List selectAlternativeTabletsForCluster( numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum(); } } - LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}", + numOfLowPaths, medium, clusterStat.getTag(), isUrgent); List alternativeTabletInfos = Lists.newArrayList(); int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); @@ -121,6 +122,8 @@ protected List selectAlternativeTabletsForCluster( .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) .collect(Collectors.toList()); + boolean hasCandidateTablet = false; + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first @@ -222,6 +225,8 @@ protected List selectAlternativeTabletsForCluster( continue; } + hasCandidateTablet = true; + // for urgent disk, pick tablets order by size, // then it may always pick tablets that was on the low backends. if (!lowBETablets.isEmpty() @@ -270,6 +275,9 @@ protected List selectAlternativeTabletsForCluster( if (!alternativeTablets.isEmpty()) { LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}", medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos); + } else if (isUrgent && !hasCandidateTablet) { + LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}", + medium, clusterStat.getTag()); } return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 20688c57a46866..37a4b922023bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -234,9 +234,7 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta .collect(Collectors.toCollection(ArrayList::new)); long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); // plus 5 for uncompressed data - // replica's actual disk usage is a litter bigger then its reported data size - // but 5 times maybe a little too big, i don't known why use so big. just add a config here. - long uncompressedPartitionSize = estimatePartitionSize * Config.autobucket_compress_size_factor; + long uncompressedPartitionSize = estimatePartitionSize * 5; int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 538383e69b8bd5..2ae051e4f2518e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1766,20 +1766,25 @@ public void testAutoBuckets() throws Exception { table.readLock(); try { - // first 40 partitions with size 0, then 10 partitions with size 20GB - for (int i = 0; i < 50; i++) { + // first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets) + for (int i = 0; i < 52; i++) { Partition partition = partitions.get(i); partition.updateVisibleVersion(2L); for (MaterializedIndex idx : partition.getMaterializedIndices( MaterializedIndex.IndexExtState.VISIBLE)) { + Assert.assertEquals(10, idx.getTablets().size()); for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { replica.updateVersion(2L); - replica.setDataSize(i < 40 ? 0L : 20L << 30); + replica.setDataSize(i < 40 ? 0L : 10L << 30); replica.setRowCount(1000L); } } } + if (i >= 40) { + // first 52 partitions are 10 buckets(FeConstants.default_bucket_num) + Assert.assertEquals(10 * (10L << 30), partition.getAllDataSize(true)); + } } } finally { table.readUnlock(); @@ -1793,6 +1798,7 @@ public void testAutoBuckets() throws Exception { partitions = Lists.newArrayList(table.getAllPartitions()); partitions.sort(Comparator.comparing(Partition::getId)); Assert.assertEquals(54, partitions.size()); - Assert.assertTrue(partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum() > 40); + // 100GB total, 1GB per bucket, should 100 buckets. + Assert.assertEquals(100, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); } }