Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2757,6 +2757,12 @@ public class Config extends ConfigBase {
})
public static int autobucket_partition_size_per_bucket_gb = -1;

@ConfField(mutable = true, masterOnly = true, description = {"Auto bucket中计算出的新的分区bucket num超过前一个分区的"
+ "bucket num的百分比,被认为是异常case报警",
"The new partition bucket number calculated in the auto bucket exceeds the percentage "
+ "of the previous partition's bucket number, which is considered an abnormal case alert."})
public static double autobucket_out_of_bounds_percent_threshold = 0.5;

@ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow Flight Server中所有用户token的缓存上限,"
+ "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开,"
+ "bearer token 从 cache 淘汰的同时会 unregister Connection.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public void clearRollupIndexInfo() {
this.rollupFinishedVersion = -1L;
}

public long getDataSize(boolean singleReplica) {
public long getDataSize(boolean singleReplica, boolean filterSizeZero) {
long dataSize = 0;
for (Tablet tablet : getTablets()) {
dataSize += tablet.getDataSize(singleReplica);
dataSize += tablet.getDataSize(singleReplica, filterSizeZero);
}
return dataSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ public static List<List<String>> getDataSkew(String dbName, String tblName, Part
for (int i = 0; i < tabletIds.size(); i++) {
Tablet tablet = mIndex.getTablet(tabletIds.get(i));
long rowCount = tablet.getRowCount(true);
long dataSize = tablet.getDataSize(true);
long dataSize = tablet.getDataSize(true, false);
rowCountTabletInfos.set(i, rowCountTabletInfos.get(i) + rowCount);
dataSizeTabletInfos.set(i, dataSizeTabletInfos.get(i) + dataSize);
totalSize += dataSize;
Expand Down Expand Up @@ -633,7 +633,7 @@ public static List<List<String>> getDataSkew(String dbName, String tblName, Part
for (int i = 0; i < tabletIds.size(); i++) {
Tablet tablet = mIndex.getTablet(tabletIds.get(i));
long rowCount = tablet.getRowCount(true);
long dataSize = tablet.getDataSize(true);
long dataSize = tablet.getDataSize(true, false);
rowCountTabletInfos.set(i, rowCountTabletInfos.get(i) + rowCount);
dataSizeTabletInfos.set(i, dataSizeTabletInfos.get(i) + dataSize);
totalSize += dataSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ public long getAvgRowLength() {
long dataSize = 0;
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
rowCount += entry.getValue().getBaseIndex().getRowCount();
dataSize += entry.getValue().getBaseIndex().getDataSize(false);
dataSize += entry.getValue().getBaseIndex().getDataSize(false, false);
}
if (rowCount > 0) {
return dataSize / rowCount;
Expand Down
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public long getAllDataSize(boolean singleReplica) {
public long getDataSize(boolean singleReplica) {
long dataSize = 0;
for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) {
dataSize += mIndex.getDataSize(singleReplica);
dataSize += mIndex.getDataSize(singleReplica, false);
}
return dataSize;
}
Expand Down Expand Up @@ -409,7 +409,7 @@ public long getRowCount() {

public long getAvgRowLength() {
long rowCount = getBaseIndex().getRowCount();
long dataSize = getBaseIndex().getDataSize(false);
long dataSize = getBaseIndex().getDataSize(false, false);
if (rowCount > 0) {
return dataSize / rowCount;
} else {
Expand All @@ -418,6 +418,14 @@ public long getAvgRowLength() {
}

public long getDataLength() {
return getBaseIndex().getDataSize(false);
return getBaseIndex().getDataSize(false, false);
}

public long getDataSizeExcludeEmptyReplica(boolean singleReplica) {
long dataSize = 0;
for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.VISIBLE)) {
dataSize += mIndex.getDataSize(singleReplica, true);
}
return dataSize + getRemoteDataSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,11 @@ public boolean equals(Object obj) {
return id == tablet.id;
}

public long getDataSize(boolean singleReplica) {
// ATTN: Replica::getDataSize may zero in cloud and non-cloud
// due to dataSize not write to image
public long getDataSize(boolean singleReplica, boolean filterSizeZero) {
LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL)
.filter(r -> !filterSizeZero || r.getDataSize() > 0)
.mapToLong(Replica::getDataSize);
return singleReplica ? Double.valueOf(s.average().orElse(0)).longValue() : s.sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ private GlobalColocateStatistic buildGlobalColocateStatistic() {
bucketsSeq.size() + " vs. " + replicationNum);
Tablet tablet = index.getTablet(tabletId);
totalReplicaDataSizes.set(tabletOrderIdx,
totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true));
totalReplicaDataSizes.get(tabletOrderIdx)
+ tablet.getDataSize(true, false));
tabletOrderIdx++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -159,7 +160,7 @@ private Map<String, String> createDefaultRuntimeInfo() {
}

// exponential moving average
private static long ema(ArrayList<Long> history, int period) {
private static long ema(List<Long> history, int period) {
double alpha = 2.0 / (period + 1);
double ema = history.get(0);
for (int i = 1; i < history.size(); i++) {
Expand All @@ -168,7 +169,7 @@ private static long ema(ArrayList<Long> history, int period) {
return (long) ema;
}

private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
private static long getNextPartitionSize(List<Long> historyPartitionsSize) {
if (historyPartitionsSize.size() < 2) {
return historyPartitionsSize.get(0);
}
Expand All @@ -191,65 +192,98 @@ private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize)
}
}

private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table,
private static Pair<Integer, Integer> getBucketsNum(DynamicPartitionProperty property, OlapTable table,
String partitionName, String nowPartitionName, boolean executeFirstTime) {
// if execute first time, all partitions no contain data
if (!table.isAutoBucket() || executeFirstTime) {
return property.getBuckets();
return Pair.of(property.getBuckets(), 0);
}

// auto bucket
// get all history partitions
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
List<Partition> partitions = idToItems.stream()
.map(entry -> table.getPartition(entry.getKey()))
.filter(partition -> partition != null && !partition.getName().equals(nowPartitionName))
.collect(Collectors.toList());
List<Long> visibleVersions = null;
List<Partition> partitions = getHistoricalPartitions(table, nowPartitionName);
List<Long> visibleVersions;
try {
visibleVersions = Partition.getVisibleVersions(partitions);
} catch (RpcException e) {
LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], "
LOG.warn("auto bucket use property's buckets get visible version fail, table: [{}-{}], "
+ "partition: {}, buckets num: {}, exception: ",
table.getName(), table.getId(), partitionName, property.getBuckets(), e);
return property.getBuckets();
return Pair.of(property.getBuckets(), 0);
}

List<Partition> hasDataPartitions = filterDataPartitions(partitions, visibleVersions);
if (hasDataPartitions.isEmpty()) {
return handleNoDataPartitions(table, partitionName, property.getBuckets());
}

List<Partition> hasDataPartitions = Lists.newArrayList();
return calculateBuckets(hasDataPartitions);
}

private static List<Partition> getHistoricalPartitions(OlapTable table, String nowPartitionName) {
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
return idToItems.stream()
.map(entry -> table.getPartition(entry.getKey()))
.filter(partition -> partition != null && !partition.getName().equals(nowPartitionName))
.collect(Collectors.toList());
}

private static List<Partition> filterDataPartitions(List<Partition> partitions, List<Long> visibleVersions) {
Preconditions.checkState(partitions.size() == visibleVersions.size(),
String.format("partitions size %d not eq visibleVersions size %d, impossible",
partitions.size(), visibleVersions.size()));
List<Partition> hasDataPartitions = new ArrayList<>();
for (int i = 0; i < partitions.size(); i++) {
if (visibleVersions.get(i) >= 2) {
hasDataPartitions.add(partitions.get(i));
}
}
return hasDataPartitions;
}

// no exist history partition data
if (hasDataPartitions.isEmpty()) {
LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], "
+ "partition: {}, buckets num: {}",
table.getName(), table.getId(), partitionName, property.getBuckets());
return property.getBuckets();
private static Pair<Integer, Integer> handleNoDataPartitions(OlapTable table,
String partitionName, int defaultBuckets) {
LOG.info("auto bucket use property's buckets due to all partitions no data, table: [{}-{}], "
+ "partition: {}, buckets num: {}", table.getName(), table.getId(), partitionName, defaultBuckets);
return Pair.of(defaultBuckets, 0);
}

private static Pair<Integer, Integer> calculateBuckets(List<Partition> hasDataPartitions) {
List<Long> partitionSizeArray = new ArrayList<>();
List<Long> sizeUnknownArray = new ArrayList<>();

for (Partition hasDataPartition : hasDataPartitions) {
long partitionSize = hasDataPartition.getDataSizeExcludeEmptyReplica(true);
if (partitionSize == 0) {
sizeUnknownArray.add(partitionSize);
} else {
partitionSizeArray.add(partitionSize);
}
}

int size = hasDataPartitions.size();
Preconditions.checkState(size > 0, "hasDataPartitions size must be greater than 0");
int previousPartitionBucketsNum = hasDataPartitions.get(size - 1).getDistributionInfo().getBucketNum();

if (hasDataPartitions.size() == sizeUnknownArray.size()) {
LOG.info("TabletStatMgr not synchronized partitions size yet, so use previous partition bucket num");
return Pair.of(previousPartitionBucketsNum, previousPartitionBucketsNum);
}

ArrayList<Long> partitionSizeArray = hasDataPartitions.stream()
.map(partition -> partition.getAllDataSize(true))
.collect(Collectors.toCollection(ArrayList::new));
long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
// plus 5 for uncompressed data
long uncompressedPartitionSize = estimatePartitionSize * 5;
int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets);
LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, "
+ " estimate partition size: {}, last partitions(partition name, local size, remote size): {}",
hasDataPartitions.size(), table.getName(), table.getId(), partitionName, bucketsNum,
estimatePartitionSize,

LOG.info("auto bucket calc with {} history partitions, {} history partitions size not sync size yet,"
+ " buckets num: {}, estimate partition size: {}, last partitions: {}",
hasDataPartitions.size(), sizeUnknownArray.size(), bucketsNum, estimatePartitionSize,
hasDataPartitions.stream()
.skip(Math.max(0, hasDataPartitions.size() - 7))
.map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true)
+ ", " + partition.getRemoteDataSize() + ")")
.collect(Collectors.toList()));

return bucketsNum;
return Pair.of(bucketsNum, previousPartitionBucketsNum);
}

private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Expand Down Expand Up @@ -356,8 +390,13 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab

DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName,
Pair<Integer, Integer> ret = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName,
nowPartitionName, executeFirstTime);
int bucketsNum = ret.first;
int previousPartitionBucketsNum = ret.second;
if (olapTable.isAutoBucket()) {
checkAutoBucketCalcNumIsValid(bucketsNum, previousPartitionBucketsNum);
}
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
Expand All @@ -374,6 +413,17 @@ private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTab
return addPartitionClauses;
}

private void checkAutoBucketCalcNumIsValid(int calcNum, int previousPartitionBucketsNum) {
// previousPartitionBucketsNum == 0, some abnormal case, ignore it
if (previousPartitionBucketsNum != 0
&& (calcNum > previousPartitionBucketsNum * (1 + Config.autobucket_out_of_bounds_percent_threshold))
|| (calcNum < previousPartitionBucketsNum * (1 - Config.autobucket_out_of_bounds_percent_threshold))) {
LOG.warn("auto bucket calc num may be err, plz check. "
+ "calc bucket num {}, previous partition bucket num {}, percent {}",
calcNum, previousPartitionBucketsNum, Config.autobucket_out_of_bounds_percent_threshold);
}
}

/**
* If dynamic_partition.storage_medium is set to SSD,
* ignore hot_partition_num property and set to (SSD, 9999-12-31 23:59:59)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,13 @@ private Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> beToWarmU
List<Long> batch = new ArrayList<>();
Long curBatchSize = 0L;
for (Tablet tablet : entry.getValue()) {
if (curBatchSize + tablet.getDataSize(true) > maxSizePerBatch) {
if (curBatchSize + tablet.getDataSize(true, false) > maxSizePerBatch) {
batches.add(batch);
batch = new ArrayList<>();
curBatchSize = 0L;
}
batch.add(tablet.getId());
curBatchSize += tablet.getDataSize(true);
curBatchSize += tablet.getDataSize(true, false);
}
if (!batch.isEmpty()) {
batches.add(batch);
Expand Down Expand Up @@ -416,7 +416,7 @@ private Map<Long, List<Tablet>> warmUpNewClusterByCluster(String dstClusterName,
continue;
}
for (Tablet tablet : index.getTablets()) {
warmUpTabletsSize += tablet.getDataSize(true);
warmUpTabletsSize += tablet.getDataSize(true, false);
tablets.add(tablet);
if (warmUpTabletsSize >= dstTotalFileCache) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ static class DBTabletStatistic {
inconsistentNum++;
inconsistentTabletIds.add(tablet.getId());
}
if (tablet.getDataSize(true) > Config.min_bytes_indicate_replica_too_large) {
if (tablet.getDataSize(true, false) > Config.min_bytes_indicate_replica_too_large) {
oversizeNum++;
oversizeTabletIds.add(tablet.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private void collectTableStats(OlapTable table) {
long remoteSegmentSize = 0;
for (Partition partition : table.getAllPartitions()) {
MaterializedIndex mIndex = partition.getIndex(indexId);
indexSize += mIndex.getDataSize(false);
indexSize += mIndex.getDataSize(false, false);
indexReplicaCount += mIndex.getReplicaCount();
indexRowCount += mIndex.getRowCount() == -1 ? 0 : mIndex.getRowCount();
indexRemoteSize += mIndex.getRemoteDataSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public static long getTabletDataSize(long tabletId) {
if (tablet == null) {
return -1L;
}
return tablet.getDataSize(true);
return tablet.getDataSize(true, false);
} finally {
olapTable.readUnlock();
}
Expand Down
Loading
Loading