From 5b37ad513aae299dfc46dfdfdd04383f97a9813b Mon Sep 17 00:00:00 2001 From: deardeng Date: Fri, 4 Jul 2025 16:32:45 +0800 Subject: [PATCH 1/4] [fix](auto bucket)Fix auto bucket calc bucketnum err when partition size invalid --- .../java/org/apache/doris/common/Config.java | 6 ++ .../doris/catalog/MaterializedIndex.java | 4 +- .../apache/doris/catalog/MetadataViewer.java | 4 +- .../org/apache/doris/catalog/OlapTable.java | 2 +- .../org/apache/doris/catalog/Partition.java | 14 +++- .../java/org/apache/doris/catalog/Tablet.java | 5 +- .../ColocateTableCheckerAndBalancer.java | 3 +- .../clone/DynamicPartitionScheduler.java | 65 +++++++++++++++---- .../doris/cloud/CacheHotspotManager.java | 6 +- .../common/proc/TabletHealthProcDir.java | 2 +- .../trees/plans/commands/ShowDataCommand.java | 2 +- .../apache/doris/catalog/CatalogTestUtil.java | 2 +- 12 files changed, 87 insertions(+), 28 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 39d25ea6cf9645..fc3de04f222461 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2757,6 +2757,12 @@ public class Config extends ConfigBase { }) public static int autobucket_partition_size_per_bucket_gb = -1; + @ConfField(mutable = true, masterOnly = true, description = {"Auto bucket中计算出的新的分区bucket num超过前一个分区的" + + "bucket num的百分比,被认为是异常case报警", + "The new partition bucket number calculated in the auto bucket exceeds the percentage " + + "of the previous partition's bucket number, which is considered an abnormal case alert."}) + public static double autobucket_out_of_bounds_percent_threshold = 0.5; + @ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow Flight Server中所有用户token的缓存上限," + "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开," + "bearer token 从 cache 淘汰的同时会 unregister Connection.", diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index 919ef343c2b192..b1c843613137dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -168,10 +168,10 @@ public void clearRollupIndexInfo() { this.rollupFinishedVersion = -1L; } - public long getDataSize(boolean singleReplica) { + public long getDataSize(boolean singleReplica, boolean filterSizeZero) { long dataSize = 0; for (Tablet tablet : getTablets()) { - dataSize += tablet.getDataSize(singleReplica); + dataSize += tablet.getDataSize(singleReplica, filterSizeZero); } return dataSize; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index b3c6a73ffde97c..92ed8841962cf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -554,7 +554,7 @@ public static List> getDataSkew(String dbName, String tblName, Part for (int i = 0; i < tabletIds.size(); i++) { Tablet tablet = mIndex.getTablet(tabletIds.get(i)); long rowCount = tablet.getRowCount(true); - long dataSize = tablet.getDataSize(true); + long dataSize = tablet.getDataSize(true, false); rowCountTabletInfos.set(i, rowCountTabletInfos.get(i) + rowCount); dataSizeTabletInfos.set(i, dataSizeTabletInfos.get(i) + dataSize); totalSize += dataSize; @@ -633,7 +633,7 @@ public static List> getDataSkew(String dbName, String tblName, Part for (int i = 0; i < tabletIds.size(); i++) { Tablet tablet = mIndex.getTablet(tabletIds.get(i)); long rowCount = tablet.getRowCount(true); - long dataSize = tablet.getDataSize(true); + long dataSize = tablet.getDataSize(true, false); rowCountTabletInfos.set(i, rowCountTabletInfos.get(i) + rowCount); dataSizeTabletInfos.set(i, dataSizeTabletInfos.get(i) + dataSize); totalSize += dataSize; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index a0d72c484f48b2..b5b70b5d96351d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1765,7 +1765,7 @@ public long getAvgRowLength() { long dataSize = 0; for (Map.Entry entry : idToPartition.entrySet()) { rowCount += entry.getValue().getBaseIndex().getRowCount(); - dataSize += entry.getValue().getBaseIndex().getDataSize(false); + dataSize += entry.getValue().getBaseIndex().getDataSize(false, false); } if (rowCount > 0) { return dataSize / rowCount; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 72b3402e4be7ec..af6914b4fec9f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -288,7 +288,7 @@ public long getAllDataSize(boolean singleReplica) { public long getDataSize(boolean singleReplica) { long dataSize = 0; for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) { - dataSize += mIndex.getDataSize(singleReplica); + dataSize += mIndex.getDataSize(singleReplica, false); } return dataSize; } @@ -409,7 +409,7 @@ public long getRowCount() { public long getAvgRowLength() { long rowCount = getBaseIndex().getRowCount(); - long dataSize = getBaseIndex().getDataSize(false); + long dataSize = getBaseIndex().getDataSize(false, false); if (rowCount > 0) { return dataSize / rowCount; } else { @@ -418,6 +418,14 @@ public long getAvgRowLength() { } public long getDataLength() { - return getBaseIndex().getDataSize(false); + return getBaseIndex().getDataSize(false, false); + } + + public long getDataSizeExcludeEmptyReplica(boolean singleReplica) { + long dataSize = 0; + for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) { + dataSize += mIndex.getDataSize(singleReplica, true); + } + return dataSize + getRemoteDataSize(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 6a725762aea6e6..379cc0a17da716 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -488,8 +488,11 @@ public boolean equals(Object obj) { return id == tablet.id; } - public long getDataSize(boolean singleReplica) { + // ATTN: Replica::getDataSize may zero in cloud and non-cloud + // due to dataSize not write to image + public long getDataSize(boolean singleReplica, boolean filterSizeZero) { LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL) + .filter(r -> !filterSizeZero || r.getDataSize() > 0) .mapToLong(Replica::getDataSize); return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 7727bc77e18667..7dc0a9125dddf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -655,7 +655,8 @@ private GlobalColocateStatistic buildGlobalColocateStatistic() { bucketsSeq.size() + " vs. " + replicationNum); Tablet tablet = index.getTablet(tabletId); totalReplicaDataSizes.set(tabletOrderIdx, - totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true)); + totalReplicaDataSizes.get(tabletOrderIdx) + + tablet.getDataSize(true, false)); tabletOrderIdx++; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index d40cc702eb3946..8c62197eacef94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -58,6 +58,7 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -191,11 +192,11 @@ private static long getNextPartitionSize(ArrayList historyPartitionsSize) } } - private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, + private static Pair getBucketsNum(DynamicPartitionProperty property, OlapTable table, String partitionName, String nowPartitionName, boolean executeFirstTime) { // if execute first time, all partitions no contain data if (!table.isAutoBucket() || executeFirstTime) { - return property.getBuckets(); + return Pair.of(property.getBuckets(), 0); } // auto bucket @@ -214,7 +215,7 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], " + "partition: {}, buckets num: {}, exception: ", table.getName(), table.getId(), partitionName, property.getBuckets(), e); - return property.getBuckets(); + return Pair.of(property.getBuckets(), 0); } List hasDataPartitions = Lists.newArrayList(); @@ -226,22 +227,46 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta // no exist history partition data if (hasDataPartitions.isEmpty()) { - LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], " + LOG.info("auto bucket use property's buckets due to all partitions no data, table: [{}-{}], " + "partition: {}, buckets num: {}", table.getName(), table.getId(), partitionName, property.getBuckets()); - return property.getBuckets(); + return Pair.of(property.getBuckets(), 0); } - ArrayList partitionSizeArray = hasDataPartitions.stream() - .map(partition -> partition.getAllDataSize(true)) - .collect(Collectors.toCollection(ArrayList::new)); + ArrayList partitionSizeArray = new ArrayList<>(); + ArrayList sizeUnknownArray = new ArrayList<>(); + for (Partition hasDataPartition : hasDataPartitions) { + long partitionSize = hasDataPartition.getDataSizeExcludeEmptyReplica(true); + if (partitionSize == 0) { + // fe may restart, TabletStatMgr thread not get replica size + sizeUnknownArray.add(partitionSize); + } else { + partitionSizeArray.add(partitionSize); + } + } + + int size = hasDataPartitions.size(); + Preconditions.checkState(size > 0, "hasDataPartitions size must be greater than 0"); + int previousPartitionBucketsNum = hasDataPartitions.get(size - 1).getDistributionInfo().getBucketNum(); + if (hasDataPartitions.size() == sizeUnknownArray.size()) { + // this table all partitions size Not synchronized yet + // bucket num is previous partition + LOG.info("TabletStatMgr not synchronized partitions size yet, so use previous partition bucket num"); + return Pair.of(previousPartitionBucketsNum, previousPartitionBucketsNum); + } + // ATTN: The following two scenarios both use auto bucket to calculate the bucket logic: + // 1. hasDataPartitions.size == partitionSizeArray.size, TabletStatMgr has get all partition size + // 2. partitionSizeArray.size < hasDataPartitions.size, TabletStatMgr thread synchronizes some size data + long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); // plus 5 for uncompressed data long uncompressedPartitionSize = estimatePartitionSize * 5; int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); - LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " + LOG.info("autobucket calc with {} history partitions, {} history partitions size not sync size yet," + + " table: [{}-{}], partition: {}, buckets num: {}, " + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", - hasDataPartitions.size(), table.getName(), table.getId(), partitionName, bucketsNum, + sizeUnknownArray.size(), hasDataPartitions.size(), + table.getName(), table.getId(), partitionName, bucketsNum, estimatePartitionSize, hasDataPartitions.stream() .skip(Math.max(0, hasDataPartitions.size() - 7)) @@ -249,7 +274,7 @@ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable ta + ", " + partition.getRemoteDataSize() + ")") .collect(Collectors.toList())); - return bucketsNum; + return Pair.of(bucketsNum, previousPartitionBucketsNum); } private ArrayList getAddPartitionClause(Database db, OlapTable olapTable, @@ -356,8 +381,13 @@ private ArrayList getAddPartitionClause(Database db, OlapTab DistributionDesc distributionDesc = null; DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, + Pair ret = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, nowPartitionName, executeFirstTime); + int bucketsNum = ret.first; + int previousPartitionBucketsNum = ret.second; + if (olapTable.isAutoBucket()) { + checkAutoBucketCalcNumIsValid(bucketsNum, previousPartitionBucketsNum); + } if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List distColumnNames = new ArrayList<>(); @@ -374,6 +404,17 @@ private ArrayList getAddPartitionClause(Database db, OlapTab return addPartitionClauses; } + private void checkAutoBucketCalcNumIsValid(int calcNum, int previousPartitionBucketsNum) { + // previousPartitionBucketsNum == 0, some abnormal case, ignore it + if (previousPartitionBucketsNum != 0 + && (calcNum > previousPartitionBucketsNum * (1 + Config.autobucket_out_of_bounds_percent_threshold)) + || (calcNum < previousPartitionBucketsNum * (1 - Config.autobucket_out_of_bounds_percent_threshold))) { + LOG.warn("auto bucket calc num may be err, plz check. " + + "calc bucket num {}, previous partition bucket num {}, percent {}", + calcNum, previousPartitionBucketsNum, Config.autobucket_out_of_bounds_percent_threshold); + } + } + /** * If dynamic_partition.storage_medium is set to SSD, * ignore hot_partition_num property and set to (SSD, 9999-12-31 23:59:59) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 48771d36240693..1451125686ffbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -372,13 +372,13 @@ private Map>> splitBatch(Map> beToWarmU List batch = new ArrayList<>(); Long curBatchSize = 0L; for (Tablet tablet : entry.getValue()) { - if (curBatchSize + tablet.getDataSize(true) > maxSizePerBatch) { + if (curBatchSize + tablet.getDataSize(true, false) > maxSizePerBatch) { batches.add(batch); batch = new ArrayList<>(); curBatchSize = 0L; } batch.add(tablet.getId()); - curBatchSize += tablet.getDataSize(true); + curBatchSize += tablet.getDataSize(true, false); } if (!batch.isEmpty()) { batches.add(batch); @@ -416,7 +416,7 @@ private Map> warmUpNewClusterByCluster(String dstClusterName, continue; } for (Tablet tablet : index.getTablets()) { - warmUpTabletsSize += tablet.getDataSize(true); + warmUpTabletsSize += tablet.getDataSize(true, false); tablets.add(tablet); if (warmUpTabletsSize >= dstTotalFileCache) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index 6c36a926d088b8..b1be9c79ab6252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -262,7 +262,7 @@ static class DBTabletStatistic { inconsistentNum++; inconsistentTabletIds.add(tablet.getId()); } - if (tablet.getDataSize(true) > Config.min_bytes_indicate_replica_too_large) { + if (tablet.getDataSize(true, false) > Config.min_bytes_indicate_replica_too_large) { oversizeNum++; oversizeTabletIds.add(tablet.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowDataCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowDataCommand.java index 067836dd58381d..ca00645d2e469d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowDataCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowDataCommand.java @@ -310,7 +310,7 @@ private void collectTableStats(OlapTable table) { long remoteSegmentSize = 0; for (Partition partition : table.getAllPartitions()) { MaterializedIndex mIndex = partition.getIndex(indexId); - indexSize += mIndex.getDataSize(false); + indexSize += mIndex.getDataSize(false, false); indexReplicaCount += mIndex.getReplicaCount(); indexRowCount += mIndex.getRowCount() == -1 ? 0 : mIndex.getRowCount(); indexRemoteSize += mIndex.getRemoteDataSize(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index b16d3d15cf7e52..8a19912d321c8c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -385,7 +385,7 @@ public static long getTabletDataSize(long tabletId) { if (tablet == null) { return -1L; } - return tablet.getDataSize(true); + return tablet.getDataSize(true, false); } finally { olapTable.readUnlock(); } From b9fa5caa5b3524be411eccc04cfdd153477ca759 Mon Sep 17 00:00:00 2001 From: deardeng Date: Fri, 4 Jul 2025 19:44:19 +0800 Subject: [PATCH 2/4] refactor --- .../clone/DynamicPartitionScheduler.java | 76 +++++++++++-------- .../common/util/AutoBucketUtilsTest.java | 1 + 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 8c62197eacef94..4d2cd58c9c0eb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -160,7 +160,7 @@ private Map createDefaultRuntimeInfo() { } // exponential moving average - private static long ema(ArrayList history, int period) { + private static long ema(List history, int period) { double alpha = 2.0 / (period + 1); double ema = history.get(0); for (int i = 1; i < history.size(); i++) { @@ -169,7 +169,7 @@ private static long ema(ArrayList history, int period) { return (long) ema; } - private static long getNextPartitionSize(ArrayList historyPartitionsSize) { + private static long getNextPartitionSize(List historyPartitionsSize) { if (historyPartitionsSize.size() < 2) { return historyPartitionsSize.get(0); } @@ -199,46 +199,63 @@ private static Pair getBucketsNum(DynamicPartitionProperty pro return Pair.of(property.getBuckets(), 0); } - // auto bucket - // get all history partitions + List partitions = getHistoricalPartitions(table, nowPartitionName); + List visibleVersions = getVisibleVersions(partitions, table, partitionName, property.getBuckets()); + + List hasDataPartitions = filterDataPartitions(partitions, visibleVersions); + if (hasDataPartitions.isEmpty()) { + return handleNoDataPartitions(table, partitionName, property.getBuckets()); + } + + return calculateBuckets(hasDataPartitions); + } + + private static List getHistoricalPartitions(OlapTable table, String nowPartitionName) { RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); List> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); - List partitions = idToItems.stream() + return idToItems.stream() .map(entry -> table.getPartition(entry.getKey())) .filter(partition -> partition != null && !partition.getName().equals(nowPartitionName)) .collect(Collectors.toList()); - List visibleVersions = null; + } + + private static List getVisibleVersions(List partitions, OlapTable table, + String partitionName, int defaultBuckets) { try { - visibleVersions = Partition.getVisibleVersions(partitions); + return Partition.getVisibleVersions(partitions); } catch (RpcException e) { - LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], " + LOG.warn("auto bucket use property's buckets get visible version fail, table: [{}-{}], " + "partition: {}, buckets num: {}, exception: ", - table.getName(), table.getId(), partitionName, property.getBuckets(), e); - return Pair.of(property.getBuckets(), 0); + table.getName(), table.getId(), partitionName, defaultBuckets, e); + return Collections.emptyList(); // Return empty list to indicate failure } + } - List hasDataPartitions = Lists.newArrayList(); + private static List filterDataPartitions(List partitions, List visibleVersions) { + List hasDataPartitions = new ArrayList<>(); for (int i = 0; i < partitions.size(); i++) { if (visibleVersions.get(i) >= 2) { hasDataPartitions.add(partitions.get(i)); } } + return hasDataPartitions; + } - // no exist history partition data - if (hasDataPartitions.isEmpty()) { - LOG.info("auto bucket use property's buckets due to all partitions no data, table: [{}-{}], " - + "partition: {}, buckets num: {}", - table.getName(), table.getId(), partitionName, property.getBuckets()); - return Pair.of(property.getBuckets(), 0); - } + private static Pair handleNoDataPartitions(OlapTable table, + String partitionName, int defaultBuckets) { + LOG.info("auto bucket use property's buckets due to all partitions no data, table: [{}-{}], " + + "partition: {}, buckets num: {}", table.getName(), table.getId(), partitionName, defaultBuckets); + return Pair.of(defaultBuckets, 0); + } + + private static Pair calculateBuckets(List hasDataPartitions) { + List partitionSizeArray = new ArrayList<>(); + List sizeUnknownArray = new ArrayList<>(); - ArrayList partitionSizeArray = new ArrayList<>(); - ArrayList sizeUnknownArray = new ArrayList<>(); for (Partition hasDataPartition : hasDataPartitions) { long partitionSize = hasDataPartition.getDataSizeExcludeEmptyReplica(true); if (partitionSize == 0) { - // fe may restart, TabletStatMgr thread not get replica size sizeUnknownArray.add(partitionSize); } else { partitionSizeArray.add(partitionSize); @@ -248,26 +265,19 @@ private static Pair getBucketsNum(DynamicPartitionProperty pro int size = hasDataPartitions.size(); Preconditions.checkState(size > 0, "hasDataPartitions size must be greater than 0"); int previousPartitionBucketsNum = hasDataPartitions.get(size - 1).getDistributionInfo().getBucketNum(); + if (hasDataPartitions.size() == sizeUnknownArray.size()) { - // this table all partitions size Not synchronized yet - // bucket num is previous partition LOG.info("TabletStatMgr not synchronized partitions size yet, so use previous partition bucket num"); return Pair.of(previousPartitionBucketsNum, previousPartitionBucketsNum); } - // ATTN: The following two scenarios both use auto bucket to calculate the bucket logic: - // 1. hasDataPartitions.size == partitionSizeArray.size, TabletStatMgr has get all partition size - // 2. partitionSizeArray.size < hasDataPartitions.size, TabletStatMgr thread synchronizes some size data long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); - // plus 5 for uncompressed data long uncompressedPartitionSize = estimatePartitionSize * 5; int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); - LOG.info("autobucket calc with {} history partitions, {} history partitions size not sync size yet," - + " table: [{}-{}], partition: {}, buckets num: {}, " - + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", - sizeUnknownArray.size(), hasDataPartitions.size(), - table.getName(), table.getId(), partitionName, bucketsNum, - estimatePartitionSize, + + LOG.info("auto bucket calc with {} history partitions, {} history partitions size not sync size yet," + + " buckets num: {}, estimate partition size: {}, last partitions: {}", + hasDataPartitions.size(), sizeUnknownArray.size(), bucketsNum, estimatePartitionSize, hasDataPartitions.stream() .skip(Math.max(0, hasDataPartitions.size() - 7)) .map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true) diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java index e76982e746f59f..8887e136e1c6a2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java @@ -134,6 +134,7 @@ public void setUp() throws Exception { Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; connectContext = UtFrameUtils.createDefaultCtx(); + Config.autobucket_partition_size_per_bucket_gb = 1; } @After From c5cba17a11cdc302c693fd7ffc9e7cc3aac85e62 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 7 Jul 2025 14:34:02 +0800 Subject: [PATCH 3/4] add ut --- .../clone/DynamicPartitionScheduler.java | 25 +++++------ .../catalog/DynamicPartitionTableTest.java | 44 +++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 4d2cd58c9c0eb8..3e95ebac173dd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -200,7 +200,15 @@ private static Pair getBucketsNum(DynamicPartitionProperty pro } List partitions = getHistoricalPartitions(table, nowPartitionName); - List visibleVersions = getVisibleVersions(partitions, table, partitionName, property.getBuckets()); + List visibleVersions; + try { + visibleVersions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + LOG.warn("auto bucket use property's buckets get visible version fail, table: [{}-{}], " + + "partition: {}, buckets num: {}, exception: ", + table.getName(), table.getId(), partitionName, property.getBuckets(), e); + return Pair.of(property.getBuckets(), 0); + } List hasDataPartitions = filterDataPartitions(partitions, visibleVersions); if (hasDataPartitions.isEmpty()) { @@ -220,19 +228,10 @@ private static List getHistoricalPartitions(OlapTable table, String n .collect(Collectors.toList()); } - private static List getVisibleVersions(List partitions, OlapTable table, - String partitionName, int defaultBuckets) { - try { - return Partition.getVisibleVersions(partitions); - } catch (RpcException e) { - LOG.warn("auto bucket use property's buckets get visible version fail, table: [{}-{}], " - + "partition: {}, buckets num: {}, exception: ", - table.getName(), table.getId(), partitionName, defaultBuckets, e); - return Collections.emptyList(); // Return empty list to indicate failure - } - } - private static List filterDataPartitions(List partitions, List visibleVersions) { + Preconditions.checkState(partitions.size() == visibleVersions.size(), + String.format("partitions size %d not eq visibleVersions size %d, impossible", + partitions.size(), visibleVersions.size())); List hasDataPartitions = new ArrayList<>(); for (int i = 0; i < partitions.size(); i++) { if (visibleVersions.get(i) >= 2) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 9495c048f4b13f..c4658b8563e281 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1826,5 +1826,49 @@ public void testAutoBuckets() throws Exception { Assert.assertEquals(54, partitions.size()); // 100GB total, 5GB per bucket, should 20 buckets. Assert.assertEquals(20, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); + + // mock partition size eq 0, use back-to-back logic + table.readLock(); + try { + // when fe restart, when stat thread not get replica size from be/ms, replica size eq 0 + for (int i = 0; i < 54; i++) { + Partition partition = partitions.get(i); + partition.updateVisibleVersion(2L); + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + if (i < 52) { + Assert.assertEquals(10, idx.getTablets().size()); + } else if (i == 52) { + Assert.assertEquals(1, idx.getTablets().size()); + } else if (i == 53) { + Assert.assertEquals(20, idx.getTablets().size()); + } + for (Tablet tablet : idx.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + replica.updateVersion(3L); + // mock replica size eq 0 + replica.setDataSize(0L); + replica.setRowCount(0L); + } + } + } + Assert.assertEquals(0, partition.getAllDataSize(true)); + } + } finally { + table.readUnlock(); + } + + String alterStmt3 = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '4')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt3)); + // 54th previous partition size set 53, check back to back logic work + partitions.get(53).getDistributionInfo().setBucketNum(53); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(55, partitions.size()); + // due to partition size eq 0, use previous partition's(54th) bucket num + Assert.assertEquals(53, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); } } From 600ed133ab25ee1d980548f0cc923bdd3e5d78b1 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 7 Jul 2025 15:00:48 +0800 Subject: [PATCH 4/4] fix compile --- .../org/apache/doris/catalog/DynamicPartitionTableTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index c4658b8563e281..5aa85af82d4e0e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1835,7 +1835,7 @@ public void testAutoBuckets() throws Exception { Partition partition = partitions.get(i); partition.updateVisibleVersion(2L); for (MaterializedIndex idx : partition.getMaterializedIndices( - MaterializedIndex.IndexExtState.VISIBLE)) { + MaterializedIndex.IndexExtState.VISIBLE)) { if (i < 52) { Assert.assertEquals(10, idx.getTablets().size()); } else if (i == 52) { @@ -1858,8 +1858,7 @@ public void testAutoBuckets() throws Exception { table.readUnlock(); } - String alterStmt3 = - "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '4')"; + String alterStmt3 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '4')"; ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt3)); // 54th previous partition size set 53, check back to back logic work partitions.get(53).getDistributionInfo().setBucketNum(53);