From a4b8ef31566b9621916263d05f2c3a776420bb35 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 14 Jul 2025 18:03:24 +0800 Subject: [PATCH 1/3] [fix](auto bucket)Fix auto bucket calc bucketnum err when partition size is invalid (#52801) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ize invalid 1. Fixed the problem that auto bucket will calculate wrong results when partition size is inaccurate - If `replica.size == 0`, filter out this replica. In the tablet.getDataSize function, the size is calculated by taking the average value of the replicas. When the size of a replica is 0, it will have a great impact on the average value. Therefore, the replicas with size=0 are filtered out. - If the partition size equals 0, do not include it in the estimation of the partition size. - If all versions with data partitions have sizes equal to 0, then the newly calculated bucket number for the partition will equal the bucket number of the previous version with a size greater than 0. Since we do not know the partition size of the data partitions (as stats thread have not been collected yet), we assume that the new partition's size equals the size of the previous version with a size greater than 0. Consequently, the bucket number will naturally equal that of the previous partition. 2. Added alarm log when the bucket num calculated by auto bucket exceeds the threshold --- .../java/org/apache/doris/common/Config.java | 6 + .../apache/doris/analysis/ShowDataStmt.java | 2 +- .../doris/catalog/MaterializedIndex.java | 4 +- .../apache/doris/catalog/MetadataViewer.java | 2 +- .../org/apache/doris/catalog/OlapTable.java | 2 +- .../org/apache/doris/catalog/Partition.java | 14 ++- .../java/org/apache/doris/catalog/Tablet.java | 5 +- .../ColocateTableCheckerAndBalancer.java | 3 +- .../clone/DynamicPartitionScheduler.java | 116 +++++++++++++----- .../doris/cloud/CacheHotspotManager.java | 6 +- .../common/proc/TabletHealthProcDir.java | 2 +- .../apache/doris/catalog/CatalogTestUtil.java | 2 +- .../catalog/DynamicPartitionTableTest.java | 43 +++++++ .../common/util/AutoBucketUtilsTest.java | 1 + 14 files changed, 160 insertions(+), 48 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5b0fe49de3d75e..835bdbb66a8a6d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2690,6 +2690,12 @@ public class Config extends ConfigBase { }) public static int autobucket_partition_size_per_bucket_gb = -1; + @ConfField(mutable = true, masterOnly = true, description = {"Auto bucket中计算出的新的分区bucket num超过前一个分区的" + + "bucket num的百分比,被认为是异常case报警", + "The new partition bucket number calculated in the auto bucket exceeds the percentage " + + "of the previous partition's bucket number, which is considered an abnormal case alert."}) + public static double autobucket_out_of_bounds_percent_threshold = 0.5; + @ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow Flight Server中所有用户token的缓存上限," + "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开," + "bearer token 从 cache 淘汰的同时会 unregister Connection.", diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java index c7bee06ffda485..bc2ac9d5d7c603 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java @@ -298,7 +298,7 @@ private void collectTableStats(OlapTable table) { long remoteSegmentSize = 0; for (Partition partition : table.getAllPartitions()) { MaterializedIndex mIndex = partition.getIndex(indexId); - indexSize += mIndex.getDataSize(false); + indexSize += mIndex.getDataSize(false, false); indexReplicaCount += mIndex.getReplicaCount(); indexRowCount += mIndex.getRowCount() == -1 ? 0 : mIndex.getRowCount(); indexRemoteSize += mIndex.getRemoteDataSize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 5e367b538cd74c..b5a31a32e1e961 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -173,10 +173,10 @@ public void clearRollupIndexInfo() { this.rollupFinishedVersion = -1L; } - public long getDataSize(boolean singleReplica) { + public long getDataSize(boolean singleReplica, boolean filterSizeZero) { long dataSize = 0; for (Tablet tablet : getTablets()) { - dataSize += tablet.getDataSize(singleReplica); + dataSize += tablet.getDataSize(singleReplica, filterSizeZero); } return dataSize; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 66cdfa5b42d1ae..e44b7ec52c8c7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -332,7 +332,7 @@ private static List> getDataSkew(String dbName, String tblName, Par for (int i = 0; i < tabletIds.size(); i++) { Tablet tablet = mIndex.getTablet(tabletIds.get(i)); long rowCount = tablet.getRowCount(true); - long dataSize = tablet.getDataSize(true); + long dataSize = tablet.getDataSize(true, false); rowCountTabletInfos.set(i, rowCountTabletInfos.get(i) + rowCount); dataSizeTabletInfos.set(i, dataSizeTabletInfos.get(i) + dataSize); totalSize += dataSize; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index b94ccbb42d2933..5248744dd98cf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1594,7 +1594,7 @@ public long getAvgRowLength() { long dataSize = 0; for (Map.Entry entry : idToPartition.entrySet()) { rowCount += entry.getValue().getBaseIndex().getRowCount(); - dataSize += entry.getValue().getBaseIndex().getDataSize(false); + dataSize += entry.getValue().getBaseIndex().getDataSize(false, false); } if (rowCount > 0) { return dataSize / rowCount; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 0468d3a75498f0..e00bf4b28ef1b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -291,7 +291,7 @@ public long getAllDataSize(boolean singleReplica) { public long getDataSize(boolean singleReplica) { long dataSize = 0; for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) { - dataSize += mIndex.getDataSize(singleReplica); + dataSize += mIndex.getDataSize(singleReplica, false); } return dataSize; } @@ -458,7 +458,7 @@ public long getRowCount() { public long getAvgRowLength() { long rowCount = getBaseIndex().getRowCount(); - long dataSize = getBaseIndex().getDataSize(false); + long dataSize = getBaseIndex().getDataSize(false, false); if (rowCount > 0) { return dataSize / rowCount; } else { @@ -467,6 +467,14 @@ public long getAvgRowLength() { } public long getDataLength() { - return getBaseIndex().getDataSize(false); + return getBaseIndex().getDataSize(false, false); + } + + public long getDataSizeExcludeEmptyReplica(boolean singleReplica) { + long dataSize = 0; + for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) { + dataSize += mIndex.getDataSize(singleReplica, true); + } + return dataSize + getRemoteDataSize(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index ea76273e6dfbfa..f18460a320dc0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -515,8 +515,11 @@ public boolean equals(Object obj) { return id == tablet.id; } - public long getDataSize(boolean singleReplica) { + // ATTN: Replica::getDataSize may zero in cloud and non-cloud + // due to dataSize not write to image + public long getDataSize(boolean singleReplica, boolean filterSizeZero) { LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL) + .filter(r -> !filterSizeZero || r.getDataSize() > 0) .mapToLong(Replica::getDataSize); return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 7727bc77e18667..7dc0a9125dddf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -655,7 +655,8 @@ private GlobalColocateStatistic buildGlobalColocateStatistic() { bucketsSeq.size() + " vs. " + replicationNum); Tablet tablet = index.getTablet(tabletId); totalReplicaDataSizes.set(tabletOrderIdx, - totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true)); + totalReplicaDataSizes.get(tabletOrderIdx) + + tablet.getDataSize(true, false)); tabletOrderIdx++; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 8e5f46573ee0ee..89099b2059c737 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -58,6 +58,7 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -159,7 +160,7 @@ private Map createDefaultRuntimeInfo() { } // exponential moving average - private static long ema(ArrayList history, int period) { + private static long ema(List history, int period) { double alpha = 2.0 / (period + 1); double ema = history.get(0); for (int i = 1; i < history.size(); i++) { @@ -168,7 +169,7 @@ private static long ema(ArrayList history, int period) { return (long) ema; } - private static long getNextPartitionSize(ArrayList historyPartitionsSize) { + private static long getNextPartitionSize(List historyPartitionsSize) { if (historyPartitionsSize.size() < 2) { return historyPartitionsSize.get(0); } @@ -191,65 +192,98 @@ private static long getNextPartitionSize(ArrayList historyPartitionsSize) } } - private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, + private static Pair getBucketsNum(DynamicPartitionProperty property, OlapTable table, String partitionName, String nowPartitionName, boolean executeFirstTime) { // if execute first time, all partitions no contain data if (!table.isAutoBucket() || executeFirstTime) { - return property.getBuckets(); + return Pair.of(property.getBuckets(), 0); } - // auto bucket - // get all history partitions - RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); - List> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); - idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); - List partitions = idToItems.stream() - .map(entry -> table.getPartition(entry.getKey())) - .filter(partition -> partition != null && !partition.getName().equals(nowPartitionName)) - .collect(Collectors.toList()); - List visibleVersions = null; + List partitions = getHistoricalPartitions(table, nowPartitionName); + List visibleVersions; try { visibleVersions = Partition.getVisibleVersions(partitions); } catch (RpcException e) { - LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], " + LOG.warn("auto bucket use property's buckets get visible version fail, table: [{}-{}], " + "partition: {}, buckets num: {}, exception: ", table.getName(), table.getId(), partitionName, property.getBuckets(), e); - return property.getBuckets(); + return Pair.of(property.getBuckets(), 0); + } + + List hasDataPartitions = filterDataPartitions(partitions, visibleVersions); + if (hasDataPartitions.isEmpty()) { + return handleNoDataPartitions(table, partitionName, property.getBuckets()); } - List hasDataPartitions = Lists.newArrayList(); + return calculateBuckets(hasDataPartitions); + } + + private static List getHistoricalPartitions(OlapTable table, String nowPartitionName) { + RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); + List> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); + idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); + return idToItems.stream() + .map(entry -> table.getPartition(entry.getKey())) + .filter(partition -> partition != null && !partition.getName().equals(nowPartitionName)) + .collect(Collectors.toList()); + } + + private static List filterDataPartitions(List partitions, List visibleVersions) { + Preconditions.checkState(partitions.size() == visibleVersions.size(), + String.format("partitions size %d not eq visibleVersions size %d, impossible", + partitions.size(), visibleVersions.size())); + List hasDataPartitions = new ArrayList<>(); for (int i = 0; i < partitions.size(); i++) { if (visibleVersions.get(i) >= 2) { hasDataPartitions.add(partitions.get(i)); } } + return hasDataPartitions; + } - // no exist history partition data - if (hasDataPartitions.isEmpty()) { - LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], " - + "partition: {}, buckets num: {}", - table.getName(), table.getId(), partitionName, property.getBuckets()); - return property.getBuckets(); + private static Pair handleNoDataPartitions(OlapTable table, + String partitionName, int defaultBuckets) { + LOG.info("auto bucket use property's buckets due to all partitions no data, table: [{}-{}], " + + "partition: {}, buckets num: {}", table.getName(), table.getId(), partitionName, defaultBuckets); + return Pair.of(defaultBuckets, 0); + } + + private static Pair calculateBuckets(List hasDataPartitions) { + List partitionSizeArray = new ArrayList<>(); + List sizeUnknownArray = new ArrayList<>(); + + for (Partition hasDataPartition : hasDataPartitions) { + long partitionSize = hasDataPartition.getDataSizeExcludeEmptyReplica(true); + if (partitionSize == 0) { + sizeUnknownArray.add(partitionSize); + } else { + partitionSizeArray.add(partitionSize); + } + } + + int size = hasDataPartitions.size(); + Preconditions.checkState(size > 0, "hasDataPartitions size must be greater than 0"); + int previousPartitionBucketsNum = hasDataPartitions.get(size - 1).getDistributionInfo().getBucketNum(); + + if (hasDataPartitions.size() == sizeUnknownArray.size()) { + LOG.info("TabletStatMgr not synchronized partitions size yet, so use previous partition bucket num"); + return Pair.of(previousPartitionBucketsNum, previousPartitionBucketsNum); } - ArrayList partitionSizeArray = hasDataPartitions.stream() - .map(partition -> partition.getAllDataSize(true)) - .collect(Collectors.toCollection(ArrayList::new)); long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); - // plus 5 for uncompressed data long uncompressedPartitionSize = estimatePartitionSize * 5; int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); - LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " - + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", - hasDataPartitions.size(), table.getName(), table.getId(), partitionName, bucketsNum, - estimatePartitionSize, + + LOG.info("auto bucket calc with {} history partitions, {} history partitions size not sync size yet," + + " buckets num: {}, estimate partition size: {}, last partitions: {}", + hasDataPartitions.size(), sizeUnknownArray.size(), bucketsNum, estimatePartitionSize, hasDataPartitions.stream() .skip(Math.max(0, hasDataPartitions.size() - 7)) .map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true) + ", " + partition.getRemoteDataSize() + ")") .collect(Collectors.toList())); - return bucketsNum; + return Pair.of(bucketsNum, previousPartitionBucketsNum); } private ArrayList getAddPartitionClause(Database db, OlapTable olapTable, @@ -356,8 +390,13 @@ private ArrayList getAddPartitionClause(Database db, OlapTab DistributionDesc distributionDesc = null; DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, + Pair ret = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, nowPartitionName, executeFirstTime); + int bucketsNum = ret.first; + int previousPartitionBucketsNum = ret.second; + if (olapTable.isAutoBucket()) { + checkAutoBucketCalcNumIsValid(bucketsNum, previousPartitionBucketsNum); + } if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List distColumnNames = new ArrayList<>(); @@ -374,6 +413,17 @@ private ArrayList getAddPartitionClause(Database db, OlapTab return addPartitionClauses; } + private void checkAutoBucketCalcNumIsValid(int calcNum, int previousPartitionBucketsNum) { + // previousPartitionBucketsNum == 0, some abnormal case, ignore it + if (previousPartitionBucketsNum != 0 + && (calcNum > previousPartitionBucketsNum * (1 + Config.autobucket_out_of_bounds_percent_threshold)) + || (calcNum < previousPartitionBucketsNum * (1 - Config.autobucket_out_of_bounds_percent_threshold))) { + LOG.warn("auto bucket calc num may be err, plz check. " + + "calc bucket num {}, previous partition bucket num {}, percent {}", + calcNum, previousPartitionBucketsNum, Config.autobucket_out_of_bounds_percent_threshold); + } + } + /** * If dynamic_partition.storage_medium is set to SSD, * ignore hot_partition_num property and set to (SSD, 9999-12-31 23:59:59) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 6b7b0e5282a3c0..6797ce45acb7de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -501,13 +501,13 @@ public Map>> splitBatch(Map> beToWarmUp List batch = new ArrayList<>(); long curBatchSize = 0L; for (Tablet tablet : entry.getValue()) { - if (curBatchSize + tablet.getDataSize(true) > maxSizePerBatch) { + if (curBatchSize + tablet.getDataSize(true, false) > maxSizePerBatch) { batches.add(batch); batch = new ArrayList<>(); curBatchSize = 0L; } batch.add(tablet.getId()); - curBatchSize += tablet.getDataSize(true); + curBatchSize += tablet.getDataSize(true, false); } if (!batch.isEmpty()) { batches.add(batch); @@ -545,7 +545,7 @@ private List getHotTablets(String srcClusterName, String dstClusterName) continue; } for (Tablet tablet : index.getTablets()) { - warmUpTabletsSize += tablet.getDataSize(true); + warmUpTabletsSize += tablet.getDataSize(true, false); tablets.add(tablet); if (warmUpTabletsSize >= dstTotalFileCache) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index 6c36a926d088b8..b1be9c79ab6252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -262,7 +262,7 @@ static class DBTabletStatistic { inconsistentNum++; inconsistentTabletIds.add(tablet.getId()); } - if (tablet.getDataSize(true) > Config.min_bytes_indicate_replica_too_large) { + if (tablet.getDataSize(true, false) > Config.min_bytes_indicate_replica_too_large) { oversizeNum++; oversizeTabletIds.add(tablet.getId()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index b16d3d15cf7e52..8a19912d321c8c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -385,7 +385,7 @@ public static long getTabletDataSize(long tabletId) { if (tablet == null) { return -1L; } - return tablet.getDataSize(true); + return tablet.getDataSize(true, false); } finally { olapTable.readUnlock(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 9495c048f4b13f..5aa85af82d4e0e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1826,5 +1826,48 @@ public void testAutoBuckets() throws Exception { Assert.assertEquals(54, partitions.size()); // 100GB total, 5GB per bucket, should 20 buckets. Assert.assertEquals(20, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); + + // mock partition size eq 0, use back-to-back logic + table.readLock(); + try { + // when fe restart, when stat thread not get replica size from be/ms, replica size eq 0 + for (int i = 0; i < 54; i++) { + Partition partition = partitions.get(i); + partition.updateVisibleVersion(2L); + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + if (i < 52) { + Assert.assertEquals(10, idx.getTablets().size()); + } else if (i == 52) { + Assert.assertEquals(1, idx.getTablets().size()); + } else if (i == 53) { + Assert.assertEquals(20, idx.getTablets().size()); + } + for (Tablet tablet : idx.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + replica.updateVersion(3L); + // mock replica size eq 0 + replica.setDataSize(0L); + replica.setRowCount(0L); + } + } + } + Assert.assertEquals(0, partition.getAllDataSize(true)); + } + } finally { + table.readUnlock(); + } + + String alterStmt3 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '4')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt3)); + // 54th previous partition size set 53, check back to back logic work + partitions.get(53).getDistributionInfo().setBucketNum(53); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(55, partitions.size()); + // due to partition size eq 0, use previous partition's(54th) bucket num + Assert.assertEquals(53, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java index bd3e7f9e2a5b32..f4f364b91ced1d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java @@ -138,6 +138,7 @@ public void setUp() throws Exception { Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; connectContext = UtFrameUtils.createDefaultCtx(); + Config.autobucket_partition_size_per_bucket_gb = 1; } @After From 15c8cd60705432eaeb25806e8d8cea0d320ebcbf Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 15 Jul 2025 14:55:30 +0800 Subject: [PATCH 2/3] Fix --- .../clone/DynamicPartitionScheduler.java | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 89099b2059c737..572db637f9ed86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -254,7 +254,7 @@ private static Pair calculateBuckets(List hasDataPa for (Partition hasDataPartition : hasDataPartitions) { long partitionSize = hasDataPartition.getDataSizeExcludeEmptyReplica(true); - if (partitionSize == 0) { + if (partitionSize <= 0) { sizeUnknownArray.add(partitionSize); } else { partitionSizeArray.add(partitionSize); @@ -395,7 +395,11 @@ private ArrayList getAddPartitionClause(Database db, OlapTab int bucketsNum = ret.first; int previousPartitionBucketsNum = ret.second; if (olapTable.isAutoBucket()) { - checkAutoBucketCalcNumIsValid(bucketsNum, previousPartitionBucketsNum); + int afterCheckAndFixBucketNum = checkAndFixAutoBucketCalcNumIsValid(bucketsNum, + previousPartitionBucketsNum); + if (afterCheckAndFixBucketNum > 0) { + bucketsNum = afterCheckAndFixBucketNum; + } } if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; @@ -413,15 +417,31 @@ private ArrayList getAddPartitionClause(Database db, OlapTab return addPartitionClauses; } - private void checkAutoBucketCalcNumIsValid(int calcNum, int previousPartitionBucketsNum) { + private int checkAndFixAutoBucketCalcNumIsValid(int currentPartitionNumBuckets, int previousPartitionNumBuckets) { // previousPartitionBucketsNum == 0, some abnormal case, ignore it - if (previousPartitionBucketsNum != 0 - && (calcNum > previousPartitionBucketsNum * (1 + Config.autobucket_out_of_bounds_percent_threshold)) - || (calcNum < previousPartitionBucketsNum * (1 - Config.autobucket_out_of_bounds_percent_threshold))) { - LOG.warn("auto bucket calc num may be err, plz check. " - + "calc bucket num {}, previous partition bucket num {}, percent {}", - calcNum, previousPartitionBucketsNum, Config.autobucket_out_of_bounds_percent_threshold); + if (currentPartitionNumBuckets != 0) { + // currentPartitionNumBuckets can be too big + if (currentPartitionNumBuckets + > previousPartitionNumBuckets * (1 + Config.autobucket_out_of_bounds_percent_threshold)) { + LOG.warn("auto bucket calc num may be err, bigger than previous too much, plz check. " + + "calc bucket num {}, previous partition bucket num {}, percent {}", + currentPartitionNumBuckets, previousPartitionNumBuckets, + Config.autobucket_out_of_bounds_percent_threshold); + return currentPartitionNumBuckets; + } + // currentPartitionNumBuckets not too small. + // If it is too small, the program will intervene. use previousPartitionNumBuckets + if (currentPartitionNumBuckets + < previousPartitionNumBuckets * (1 - Config.autobucket_out_of_bounds_percent_threshold)) { + LOG.warn("auto bucket calc num may be err, smaller than previous too much, plz check. " + + "calc bucket num {}, previous partition bucket num {}, percent {}", + currentPartitionNumBuckets, previousPartitionNumBuckets, + Config.autobucket_out_of_bounds_percent_threshold); + return previousPartitionNumBuckets; + } } + LOG.info("previousPartitionBucketsNum eq 0, check before log"); + return -1; } /** From 4a0eda836a2bdb13de38316940212a23505bf21e Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 15 Jul 2025 18:26:00 +0800 Subject: [PATCH 3/3] fix ut --- .../org/apache/doris/catalog/DynamicPartitionTableTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 5aa85af82d4e0e..ad5b1ce05558bf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1779,6 +1779,7 @@ public void testAutoBuckets() throws Exception { } RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); + Config.autobucket_out_of_bounds_percent_threshold = 0.99; String alterStmt1 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1)); @@ -1789,7 +1790,7 @@ public void testAutoBuckets() throws Exception { partitions.sort(Comparator.comparing(Partition::getId)); Assert.assertEquals(53, partitions.size()); Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); - + Config.autobucket_out_of_bounds_percent_threshold = 0.5; table.readLock(); try { // first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets)