diff --git a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java index 331c79d20fce7b..a201bca59ddf56 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java @@ -56,6 +56,7 @@ import org.apache.doris.task.DropReplicaTask; import org.apache.doris.thrift.TKeysType; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import com.google.common.base.Preconditions; @@ -327,13 +328,14 @@ private void processAddRollup(AddRollupClause alterClause, Database db, OlapTabl for (Partition partition : olapTable.getPartitions()) { long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium(); MaterializedIndex rollupIndex = new MaterializedIndex(rollupIndexId, IndexState.ROLLUP); if (isRestore) { rollupIndex.setState(IndexState.NORMAL); } MaterializedIndex baseIndex = partition.getIndex(baseIndexId); TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, partitionId, rollupIndexId, - rollupSchemaHash); + rollupSchemaHash, medium); short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); for (Tablet baseTablet : baseIndex.getTablets()) { long baseTabletId = baseTablet.getId(); diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/src/main/java/org/apache/doris/alter/RollupJob.java index 2d50ab314cc477..04ca2ee44c4a94 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJob.java @@ -44,6 +44,7 @@ import org.apache.doris.task.CreateRollupTask; import org.apache.doris.thrift.TKeysType; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTaskType; @@ -782,11 +783,13 @@ public void replayInitJob(Database db) { for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { Partition partition = olapTable.getPartition(entry.getKey()); partition.setState(PartitionState.ROLLUP); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty( + partition.getId()).getStorageMedium(); if (!Catalog.isCheckpointThread()) { MaterializedIndex rollupIndex = entry.getValue(); TabletMeta tabletMeta = new TabletMeta(dbId, tableId, entry.getKey(), rollupIndexId, - rollupSchemaHash); + rollupSchemaHash, medium); for (Tablet tablet : rollupIndex.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index fc158249f607e5..36e6a660b00308 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -616,7 +616,7 @@ private void checkAndPrepareMeta() { KeysType keysType = localTbl.getKeysType(); List columns = localTbl.getSchemaByIndexId(restoredIdx.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), - restoredIdx.getId(), schemaHash); + restoredIdx.getId(), schemaHash, TStorageMedium.HDD); for (Tablet restoreTablet : restoredIdx.getTablets()) { Catalog.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { @@ -640,9 +640,7 @@ private void checkAndPrepareMeta() { // generate create replica task for all restored tables for (OlapTable restoreTbl : restoredTbls) { - PartitionInfo partInfo = restoreTbl.getPartitionInfo(); for (Partition restorePart : restoreTbl.getPartitions()) { - TStorageMedium storageMedium = partInfo.getDataProperty(restorePart.getId()).getStorageMedium(); Set bfColumns = restoreTbl.getCopiedBfColumns(); double bfFpp = restoreTbl.getBfFpp(); for (MaterializedIndex index : restorePart.getMaterializedIndices()) { @@ -651,7 +649,7 @@ private void checkAndPrepareMeta() { KeysType keysType = restoreTbl.getKeysType(); List columns = restoreTbl.getSchemaByIndexId(index.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), restoreTbl.getId(), restorePart.getId(), - index.getId(), schemaHash); + index.getId(), schemaHash, TStorageMedium.HDD); for (Tablet tablet : index.getTablets()) { Catalog.getCurrentInvertedIndex().addTablet(tablet.getId(), tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -659,7 +657,7 @@ private void checkAndPrepareMeta() { CreateReplicaTask task = new CreateReplicaTask(replica.getBackendId(), dbId, restoreTbl.getId(), restorePart.getId(), index.getId(), tablet.getId(), shortKeyColumnCount, schemaHash, replica.getVersion(), replica.getVersionHash(), - keysType, TStorageType.COLUMN, storageMedium, columns, + keysType, TStorageType.COLUMN, TStorageMedium.HDD, columns, bfColumns, bfFpp, null); task.setInRestoreMode(true); batchTask.addTask(task); @@ -921,14 +919,14 @@ private void replayCheckAndPrepareMeta() { Range remoteRange = remotePartitionInfo.getRange(remotePartId); DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId); localPartitionInfo.addPartition(restorePart.getId(), remoteRange, - remoteDataProperty, (short) restoreReplicationNum); + remoteDataProperty, (short) restoreReplicationNum); localTbl.addPartition(restorePart); // modify tablet inverted index for (MaterializedIndex restoreIdx : restorePart.getMaterializedIndices()) { int schemaHash = localTbl.getSchemaHashByIndexId(restoreIdx.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), - restoreIdx.getId(), schemaHash); + restoreIdx.getId(), schemaHash, TStorageMedium.HDD); for (Tablet restoreTablet : restoreIdx.getTablets()) { Catalog.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { @@ -946,7 +944,7 @@ private void replayCheckAndPrepareMeta() { for (MaterializedIndex restoreIdx : restorePart.getMaterializedIndices()) { int schemaHash = restoreTbl.getSchemaHashByIndexId(restoreIdx.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), restoreTbl.getId(), restorePart.getId(), - restoreIdx.getId(), schemaHash); + restoreIdx.getId(), schemaHash, TStorageMedium.HDD); for (Tablet restoreTablet : restoreIdx.getTablets()) { Catalog.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob_D.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob_D.java index 17a309026f2a96..2d419f55544869 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob_D.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob_D.java @@ -49,6 +49,7 @@ import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -448,7 +449,7 @@ public void finishing(Catalog catalog, boolean isReplay) throws DdlException { for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, - schemaHash); + schemaHash, TStorageMedium.HDD); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { invertedIndex.addReplica(tabletId, replica); @@ -499,7 +500,7 @@ public void finishing(Catalog catalog, boolean isReplay) throws DdlException { invertedIndex.addReplica(tabletId, replica); } TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, - schemaHash); + schemaHash, TStorageMedium.HDD); invertedIndex.addTablet(tabletId, tabletMeta); } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 84ab3f4e919bcb..a3a523dd1ff35d 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1332,10 +1332,12 @@ private void recreateTabletInvertIndex() { long tableId = olapTable.getId(); for (Partition partition : olapTable.getPartitions()) { long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty( + partitionId).getStorageMedium(); for (MaterializedIndex index : partition.getMaterializedIndices()) { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); @@ -1695,7 +1697,6 @@ public long loadTransactionState(DataInputStream dis, long checksum) throws IOEx public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_10) { Catalog.getCurrentRecycleBin().readFields(dis); - if (!isCheckpointThread()) { // add tablet in Recycle bin to TabletInvertedIndex Catalog.getCurrentRecycleBin().addTabletToInvertedIndex(); @@ -2999,7 +3000,7 @@ public void replayAddPartition(PartitionPersistInfo info) throws DdlException { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), - index.getId(), schemaHash); + index.getId(), schemaHash, info.getDataProperty().getStorageMedium()); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); @@ -3220,7 +3221,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long // create tablets int schemaHash = indexIdToSchemaHash.get(indexId); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium); createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, versionHash, replicationNum, tabletMeta, tabletIdSet); @@ -3990,10 +3991,12 @@ public void replayCreateTable(String dbName, Table table) { long tableId = table.getId(); for (Partition partition : olapTable.getPartitions()) { long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty( + partitionId).getStorageMedium(); for (MaterializedIndex mIndex : partition.getMaterializedIndices()) { long indexId = mIndex.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); for (Tablet tablet : mIndex.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); @@ -5982,11 +5985,13 @@ public void replayTruncateTable(TruncateTableInfo info) { TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); for (Partition partition : info.getPartitions()) { long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty( + partitionId).getStorageMedium(); for (MaterializedIndex mIndex : partition.getMaterializedIndices()) { long indexId = mIndex.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), - partitionId, indexId, schemaHash); + partitionId, indexId, schemaHash, medium); for (Tablet tablet : mIndex.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); diff --git a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 6f00557deb00e1..7cc7d4820e6b43 100644 --- a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -29,6 +29,7 @@ import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.DropReplicaTask; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -573,10 +574,11 @@ public void addTabletToInvertedIndex() { long tableId = olapTable.getId(); for (Partition partition : olapTable.getPartitions()) { long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium(); for (MaterializedIndex index : partition.getMaterializedIndices()) { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); @@ -622,11 +624,13 @@ public void addTabletToInvertedIndex() { olapTable = (OlapTable) tableInfo.getTable(); } Preconditions.checkNotNull(olapTable); - + // storage medium should be got from RecyclePartitionInfo, not from olap table. because olap table + // does not have this partition any more + TStorageMedium medium = partitionInfo.getDataProperty().getStorageMedium(); for (MaterializedIndex index : partition.getMaterializedIndices()) { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index 4529e8427e4715..5088edf412d327 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -44,6 +44,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TOlapTable; +import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -906,6 +907,7 @@ public OlapTable selectiveCopy(Collection reservedPartNames, boolean res copied.setState(OlapTableState.NORMAL); for (Partition partition : copied.getPartitions()) { partition.setState(PartitionState.NORMAL); + copied.getPartitionInfo().setDataProperty(partition.getId(), new DataProperty(TStorageMedium.HDD)); for (MaterializedIndex idx : partition.getMaterializedIndices()) { idx.setState(IndexState.NORMAL); for (Tablet tablet : idx.getTablets()) { diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 453a2c9a0bba02..f3d74fe21459d8 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /* * this class stores a inverted index @@ -159,6 +160,9 @@ public void tabletReport(long backendId, Map backendTablets, if (storageMedium != backendTabletInfo.getStorage_medium()) { tabletMigrationMap.put(storageMedium, tabletId); } + if (storageMedium != tabletMeta.getStorageMedium()) { + tabletMeta.setStorageMedium(storageMedium); + } } // check if should clear transactions if (backendTabletInfo.isSetTransaction_ids()) { @@ -440,7 +444,7 @@ public void deleteReplica(long tabletId, long backendId) { writeLock(); try { Preconditions.checkState(tabletMetaMap.containsKey(tabletId)); - // Preconditions.checkState(replicaMetaTable.containsRow(tabletId)); + TabletMeta tabletMeta = tabletMetaMap.get(tabletId); if (replicaMetaTable.containsRow(tabletId)) { Replica replica = replicaMetaTable.remove(tabletId, backendId); replicaToTabletMap.remove(replica.getId()); @@ -535,6 +539,21 @@ public List getTabletIdsByBackendId(long backendId) { return tabletIds; } + public List getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) { + List tabletIds = Lists.newArrayList(); + readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + tabletIds = replicaMetaWithBackend.keySet().stream().filter( + id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList()); + } + } finally { + readUnlock(); + } + return tabletIds; + } + public int getTabletNumByBackendId(long backendId) { readLock(); try { @@ -548,6 +567,30 @@ public int getTabletNumByBackendId(long backendId) { return 0; } + public Map getReplicaNumByBeIdAndStorageMedium(long backendId) { + Map replicaNumMap = Maps.newHashMap(); + long hddNum = 0; + long ssdNum = 0; + readLock(); + try { + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); + if (replicaMetaWithBackend != null) { + for (long tabletId : replicaMetaWithBackend.keySet()) { + if (tabletMetaMap.get(tabletId).getStorageMedium() == TStorageMedium.HDD) { + hddNum++; + } else { + ssdNum++; + } + } + } + } finally { + readUnlock(); + } + replicaNumMap.put(TStorageMedium.HDD, hddNum); + replicaNumMap.put(TStorageMedium.SSD, ssdNum); + return replicaNumMap; + } + // just for test public void clear() { writeLock(); diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletMeta.java b/fe/src/main/java/org/apache/doris/catalog/TabletMeta.java index 2dac4f1cc3e54e..4cb2b44b97cb6a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletMeta.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletMeta.java @@ -17,6 +17,8 @@ package org.apache.doris.catalog; +import org.apache.doris.thrift.TStorageMedium; + import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; @@ -35,9 +37,12 @@ public class TabletMeta { private int oldSchemaHash; private int newSchemaHash; + private TStorageMedium storageMedium; + private ReentrantReadWriteLock lock; - public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash) { + public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash, + TStorageMedium storageMedium) { this.dbId = dbId; this.tableId = tableId; this.partitionId = partitionId; @@ -46,6 +51,8 @@ public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int s this.oldSchemaHash = schemaHash; this.newSchemaHash = -1; + this.storageMedium = storageMedium; + lock = new ReentrantReadWriteLock(); } @@ -65,6 +72,14 @@ public long getIndexId() { return indexId; } + public TStorageMedium getStorageMedium() { + return storageMedium; + } + + public void setStorageMedium(TStorageMedium storageMedium) { + this.storageMedium = storageMedium; + } + public void setNewSchemaHash(int newSchemaHash) { lock.writeLock().lock(); try { diff --git a/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index a95b8124c93d60..4322c5a3e57603 100644 --- a/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -29,17 +29,62 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Set; -public class BackendLoadStatistic implements Comparable { +public class BackendLoadStatistic { private static final Logger LOG = LogManager.getLogger(BackendLoadStatistic.class); + // comparator based on load score and storage medium, smaller load score first + public static class BeStatComparator implements Comparator { + private TStorageMedium medium; + + public BeStatComparator(TStorageMedium medium) { + this.medium = medium; + } + + @Override + public int compare(BackendLoadStatistic o1, BackendLoadStatistic o2) { + double score1 = o1.getLoadScore(medium); + double score2 = o2.getLoadScore(medium); + if (score1 > score2) { + return 1; + } else if (score1 == score2) { + return 0; + } else { + return -1; + } + } + } + + public static class BeStatMixComparator implements Comparator { + @Override + public int compare(BackendLoadStatistic o1, BackendLoadStatistic o2) { + Double score1 = o1.getMixLoadScore(); + Double score2 = o2.getMixLoadScore(); + + if (score1 > score2) { + return 1; + } else if (score1 == score2) { + return 0; + } else { + return -1; + } + } + } + + public static final BeStatComparator HDD_COMPARATOR = new BeStatComparator(TStorageMedium.HDD); + public static final BeStatComparator SSD_COMPARATOR = new BeStatComparator(TStorageMedium.SSD); + public static final BeStatMixComparator MIX_COMPARATOR = new BeStatMixComparator(); + public enum Classification { INIT, LOW, // load score is Config.balance_load_score_threshold lower than average load score of cluster @@ -55,20 +100,19 @@ public enum Classification { private boolean isAvailable; - private long totalCapacityB = 1; // init as 1 to avoid dividing zero error - private long totalUsedCapacityB = 0; - private long totalReplicaNum = 0; - public static class LoadScore { public double replicaNumCoefficient = 0.5; public double capacityCoefficient = 0.5; public double score = 0.0; - } - - private LoadScore loadScore; - private Classification clazz = Classification.INIT; + public static final LoadScore DUMMY = new LoadScore(); + } + private Map totalCapacityMap = Maps.newHashMap(); + private Map totalUsedCapacityMap = Maps.newHashMap(); + private Map totalReplicaNumMap = Maps.newHashMap(); + private Map loadScoreMap = Maps.newHashMap(); + private Map clazzMap = Maps.newHashMap(); private List pathStatistics = Lists.newArrayList(); public BackendLoadStatistic(long beId, String clusterName, SystemInfoService infoService, @@ -91,28 +135,43 @@ public boolean isAvailable() { return isAvailable; } - public long getTotalCapacityB() { - return totalCapacityB; + public long getTotalCapacityB(TStorageMedium medium) { + return totalCapacityMap.getOrDefault(medium, 0L); } - public long getTotalUsedCapacityB() { - return totalUsedCapacityB; + public long getTotalUsedCapacityB(TStorageMedium medium) { + return totalUsedCapacityMap.getOrDefault(medium, 0L); } - public long getReplicaNum() { - return totalReplicaNum; + public long getReplicaNum(TStorageMedium medium) { + return totalReplicaNumMap.getOrDefault(medium, 0L); } - public double getLoadScore() { - return loadScore.score; + public double getLoadScore(TStorageMedium medium) { + if (loadScoreMap.containsKey(medium)) { + return loadScoreMap.get(medium).score; + } + return 0.0; + } + + public double getMixLoadScore() { + int mediumCount = 0; + double totalLoadScore = 0.0; + for (TStorageMedium medium : TStorageMedium.values()) { + if (hasMedium(medium)) { + mediumCount++; + totalLoadScore += getLoadScore(medium); + } + } + return totalLoadScore / mediumCount == 0 ? 1 : mediumCount; } - public void setClazz(Classification clazz) { - this.clazz = clazz; + public void setClazz(TStorageMedium medium, Classification clazz) { + this.clazzMap.put(medium, clazz); } - public Classification getClazz() { - return clazz; + public Classification getClazz(TStorageMedium medium) { + return clazzMap.getOrDefault(medium, Classification.INIT); } public void init() throws LoadBalanceException { @@ -129,10 +188,11 @@ public void init() throws LoadBalanceException { ImmutableMap disks = be.getDisks(); for (DiskInfo diskInfo : disks.values()) { + TStorageMedium medium = diskInfo.getStorageMedium(); if (diskInfo.getState() == DiskState.ONLINE) { // we only collect online disk's capacity - totalCapacityB += diskInfo.getTotalCapacityB(); - totalUsedCapacityB += diskInfo.getDataUsedCapacityB(); + totalCapacityMap.put(medium, totalCapacityMap.getOrDefault(medium, 0L) + diskInfo.getTotalCapacityB()); + totalUsedCapacityMap.put(medium, totalUsedCapacityMap.getOrDefault(medium, 0L) + diskInfo.getDataUsedCapacityB()); } RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(), @@ -141,20 +201,34 @@ public void init() throws LoadBalanceException { pathStatistics.add(pathStatistic); } - totalReplicaNum = invertedIndex.getTabletNumByBackendId(beId); + totalReplicaNumMap = invertedIndex.getReplicaNumByBeIdAndStorageMedium(beId); + // This is very tricky. because the number of replica on specified medium we get + // from getReplicaNumByBeIdAndStorageMedium() is counted based on meta data. + // but in fact there may not has SSD disk on this backend. So if we found that no SSD disk on this + // backend, set the replica number to 0, otherwise, the average replica number on specified medium + // will be incorrect. + for (TStorageMedium medium : TStorageMedium.values()) { + if (!hasMedium(medium)) { + totalReplicaNumMap.put(medium, 0L); + } + } - classifyPathByLoad(); + for (TStorageMedium storageMedium : TStorageMedium.values()) { + classifyPathByLoad(storageMedium); + } // sort the list Collections.sort(pathStatistics); } - private void classifyPathByLoad() { + private void classifyPathByLoad(TStorageMedium medium) { long totalCapacity = 0; long totalUsedCapacity = 0; for (RootPathLoadStatistic pathStat : pathStatistics) { - totalCapacity += pathStat.getCapacityB(); - totalUsedCapacity += pathStat.getUsedCapacityB(); + if (pathStat.getStorageMedium() == medium) { + totalCapacity += pathStat.getCapacityB(); + totalUsedCapacity += pathStat.getUsedCapacityB(); + } } double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity / (double) totalCapacity; @@ -162,6 +236,10 @@ private void classifyPathByLoad() { int midCounter = 0; int highCounter = 0; for (RootPathLoadStatistic pathStat : pathStatistics) { + if (pathStat.getStorageMedium() != medium) { + continue; + } + if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent) / avgUsedPercent > Config.balance_load_score_threshold) { if (pathStat.getUsedPercent() > avgUsedPercent) { @@ -177,16 +255,25 @@ private void classifyPathByLoad() { } } - LOG.info("classify path by load. avg used percent: {}. low/mid/high: {}/{}/{}", - avgUsedPercent, lowCounter, midCounter, highCounter); + LOG.debug("classify path by load. storage: {} avg used percent: {}. low/mid/high: {}/{}/{}", + avgUsedPercent, medium, lowCounter, midCounter, highCounter); } - public void calcScore(double avgClusterUsedCapacityPercent, double avgClusterReplicaNumPerBackend) { - loadScore = calcSore(totalUsedCapacityB, totalCapacityB, totalReplicaNum, avgClusterUsedCapacityPercent, - avgClusterReplicaNumPerBackend); + public void calcScore(Map avgClusterUsedCapacityPercentMap, + Map avgClusterReplicaNumPerBackendMap) { - LOG.debug("backend {}, capacity coefficient: {}, replica coefficient: {}, load score: {}", - beId, loadScore.capacityCoefficient, loadScore.replicaNumCoefficient, loadScore.score); + for (TStorageMedium medium : TStorageMedium.values()) { + LoadScore loadScore = calcSore(totalUsedCapacityMap.getOrDefault(medium, 0L), + totalCapacityMap.getOrDefault(medium, 1L), + totalReplicaNumMap.getOrDefault(medium, 0L), + avgClusterUsedCapacityPercentMap.getOrDefault(medium, 0.0), + avgClusterReplicaNumPerBackendMap.getOrDefault(medium, 0.0)); + + loadScoreMap.put(medium, loadScore); + + LOG.debug("backend {}, medium: {}, capacity coefficient: {}, replica coefficient: {}, load score: {}", + beId, medium, loadScore.capacityCoefficient, loadScore.replicaNumCoefficient, loadScore.score); + } } public static LoadScore calcSore(long beUsedCapacityB, long beTotalCapacity, long beTotalReplicaNum, @@ -214,11 +301,17 @@ public static LoadScore calcSore(long beUsedCapacityB, long beTotalCapacity, lon return loadScore; } - public BalanceStatus isFit(long tabletSize, List result, boolean isSupplement) { + public BalanceStatus isFit(long tabletSize, TStorageMedium medium, + List result, boolean isSupplement) { BalanceStatus status = new BalanceStatus(ErrCode.COMMON_ERROR); // try choosing path from first to end for (int i = 0; i < pathStatistics.size(); i++) { RootPathLoadStatistic pathStatistic = pathStatistics.get(i); + // if this is a supplement task, ignore the storage medium + if (!isSupplement && pathStatistic.getStorageMedium() != medium) { + continue; + } + BalanceStatus bStatus = pathStatistic.isFit(tabletSize, isSupplement); if (!bStatus.ok()) { status.addErrMsgs(bStatus.getErrMsgs()); @@ -270,45 +363,49 @@ public List getPathStatistics() { return pathStatistics; } - public long getAvailPathNum() { - return pathStatistics.stream().filter(p -> p.getDiskState() == DiskState.ONLINE).count(); + public long getAvailPathNum(TStorageMedium medium) { + return pathStatistics.stream().filter( + p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium).count(); + } + + public boolean hasMedium(TStorageMedium medium) { + for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) { + if (rootPathLoadStatistic.getStorageMedium() == medium) { + return true; + } + } + return false; } public String getBrief() { StringBuilder sb = new StringBuilder(); - sb.append(beId).append(": replica: ").append(totalReplicaNum); - sb.append(" used: ").append(totalUsedCapacityB); - sb.append(" total: ").append(totalCapacityB); - sb.append(" score: ").append(loadScore); + sb.append(beId); + for (TStorageMedium medium : TStorageMedium.values()) { + sb.append(", ").append(medium).append(": replica: ").append(totalReplicaNumMap.get(medium)); + sb.append(" used: ").append(totalUsedCapacityMap.getOrDefault(medium, 0L)); + sb.append(" total: ").append(totalCapacityMap.getOrDefault(medium, 0L)); + sb.append(" score: ").append(loadScoreMap.getOrDefault(medium, LoadScore.DUMMY).score); + } return sb.toString(); } - public List getInfo() { + public List getInfo(TStorageMedium medium) { List info = Lists.newArrayList(); info.add(String.valueOf(beId)); info.add(clusterName); info.add(String.valueOf(isAvailable)); - info.add(String.valueOf(totalUsedCapacityB)); - info.add(String.valueOf(totalCapacityB)); - info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(totalUsedCapacityB * 100 - / (double) totalCapacityB))); - info.add(String.valueOf(totalReplicaNum)); + long used = totalUsedCapacityMap.getOrDefault(medium, 0L); + long total = totalCapacityMap.getOrDefault(medium, 0L); + info.add(String.valueOf(used)); + info.add(String.valueOf(total)); + info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 100 + / (double) total))); + info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L))); + LoadScore loadScore = loadScoreMap.getOrDefault(medium, new LoadScore()); info.add(String.valueOf(loadScore.capacityCoefficient)); info.add(String.valueOf(loadScore.replicaNumCoefficient)); info.add(String.valueOf(loadScore.score)); - info.add(clazz.name()); + info.add(clazzMap.getOrDefault(medium, Classification.INIT).name()); return info; } - - // ascend order by load score - @Override - public int compareTo(BackendLoadStatistic o) { - if (getLoadScore() > o.getLoadScore()) { - return 1; - } else if (getLoadScore() == o.getLoadScore()) { - return 0; - } else { - return -1; - } - } } diff --git a/fe/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java index 5459344d4bbc40..3d279ef06db63a 100644 --- a/fe/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java @@ -17,7 +17,6 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.BackendLoadStatistic.LoadScore; @@ -25,16 +24,20 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /* * Load statistics of a cluster @@ -47,19 +50,17 @@ public class ClusterLoadStatistic { private String clusterName; - private long totalCapacityB = 1; - private long totalUsedCapacityB = 0; - private long totalReplicaNum = 0; - private long backendNum = 0; - - private double avgUsedCapacityPercent = 0.0; - private double avgReplicaNumPercent = 0.0; - - private double avgLoadScore = 0.0; - + private Map totalCapacityMap = Maps.newHashMap(); + private Map totalUsedCapacityMap = Maps.newHashMap(); + private Map totalReplicaNumMap = Maps.newHashMap(); + private Map avgUsedCapacityPercentMap = Maps.newHashMap(); + private Map avgReplicaNumPercentMap = Maps.newHashMap(); + private Map avgLoadScoreMap = Maps.newHashMap(); + // storage medium -> number of backend which has this kind of medium + private Map backendNumMap = Maps.newHashMap(); private List beLoadStatistics = Lists.newArrayList(); - public ClusterLoadStatistic(String clusterName, Catalog catalog, SystemInfoService infoService, + public ClusterLoadStatistic(String clusterName, SystemInfoService infoService, TabletInvertedIndex invertedIndex) { this.clusterName = clusterName; this.infoService = infoService; @@ -70,8 +71,7 @@ public void init() { ImmutableMap backends = infoService.getBackendsInCluster(clusterName); for (Backend backend : backends.values()) { BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), - backend.getOwnerClusterName(), - infoService, invertedIndex); + backend.getOwnerClusterName(), infoService, invertedIndex); try { beStatistic.init(); } catch (LoadBalanceException e) { @@ -79,57 +79,84 @@ public void init() { continue; } - totalCapacityB += beStatistic.getTotalCapacityB(); - totalUsedCapacityB += beStatistic.getTotalUsedCapacityB(); - totalReplicaNum += beStatistic.getReplicaNum(); - backendNum++; + for (TStorageMedium medium : TStorageMedium.values()) { + totalCapacityMap.put(medium, totalCapacityMap.getOrDefault(medium, 0L) + beStatistic.getTotalCapacityB(medium)); + totalUsedCapacityMap.put(medium, totalUsedCapacityMap.getOrDefault(medium, 0L) + beStatistic.getTotalUsedCapacityB(medium)); + totalReplicaNumMap.put(medium, totalReplicaNumMap.getOrDefault(medium, 0L) + beStatistic.getReplicaNum(medium)); + if (beStatistic.hasMedium(medium)) { + backendNumMap.put(medium, backendNumMap.getOrDefault(medium, 0) + 1); + } + } + beLoadStatistics.add(beStatistic); } - avgUsedCapacityPercent = totalUsedCapacityB / (double) totalCapacityB; - avgReplicaNumPercent = totalReplicaNum / (double) backendNum; + for (TStorageMedium medium : TStorageMedium.values()) { + avgUsedCapacityPercentMap.put(medium, totalUsedCapacityMap.getOrDefault(medium, 0L) / (double) totalCapacityMap.getOrDefault(medium, 1L)); + avgReplicaNumPercentMap.put(medium, totalReplicaNumMap.getOrDefault(medium, 0L) / (double) backendNumMap.getOrDefault(medium, 1)); + } for (BackendLoadStatistic beStatistic : beLoadStatistics) { - beStatistic.calcScore(avgUsedCapacityPercent, avgReplicaNumPercent); + beStatistic.calcScore(avgUsedCapacityPercentMap, avgReplicaNumPercentMap); } // classify all backends - classifyBackendByLoad(); + for (TStorageMedium medium : TStorageMedium.values()) { + classifyBackendByLoad(medium); + } - // sort the list - Collections.sort(beLoadStatistics); + // sort be stats by mix load score + Collections.sort(beLoadStatistics, BackendLoadStatistic.MIX_COMPARATOR); } /* * classify backends into 'low', 'mid' and 'high', by load */ - private void classifyBackendByLoad() { + private void classifyBackendByLoad(TStorageMedium medium) { + if (backendNumMap.getOrDefault(medium, 0) == 0) { + return; + } double totalLoadScore = 0.0; for (BackendLoadStatistic beStat : beLoadStatistics) { - totalLoadScore += beStat.getLoadScore(); + totalLoadScore += beStat.getLoadScore(medium); } - avgLoadScore = totalLoadScore / beLoadStatistics.size(); + double avgLoadScore = totalLoadScore / backendNumMap.get(medium); + avgLoadScoreMap.put(medium, avgLoadScore); int lowCounter = 0; int midCounter = 0; int highCounter = 0; for (BackendLoadStatistic beStat : beLoadStatistics) { - if (Math.abs(beStat.getLoadScore() - avgLoadScore) / avgLoadScore > Config.balance_load_score_threshold) { - if (beStat.getLoadScore() > avgLoadScore) { - beStat.setClazz(Classification.HIGH); + if (!beStat.hasMedium(medium)) { + continue; + } + + if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore > Config.balance_load_score_threshold) { + if (beStat.getLoadScore(medium) > avgLoadScore) { + beStat.setClazz(medium, Classification.HIGH); highCounter++; - } else if (beStat.getLoadScore() < avgLoadScore) { - beStat.setClazz(Classification.LOW); + } else if (beStat.getLoadScore(medium) < avgLoadScore) { + beStat.setClazz(medium, Classification.LOW); lowCounter++; } } else { - beStat.setClazz(Classification.MID); + beStat.setClazz(medium, Classification.MID); midCounter++; } } - LOG.info("classify backend by load. avg load score: {}. low/mid/high: {}/{}/{}", - avgLoadScore, lowCounter, midCounter, highCounter); + LOG.info("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}", + avgLoadScore, medium, lowCounter, midCounter, highCounter); + } + + private static void sortBeStats(List beStats, TStorageMedium medium) { + if (medium == null) { + Collections.sort(beStats, BackendLoadStatistic.MIX_COMPARATOR); + } else if (medium == TStorageMedium.HDD) { + Collections.sort(beStats, BackendLoadStatistic.HDD_COMPARATOR); + } else { + Collections.sort(beStats, BackendLoadStatistic.SSD_COMPARATOR); + } } /* @@ -139,7 +166,8 @@ private void classifyBackendByLoad() { * 2. if the summary of the diff between the new score and average score becomes smaller, we consider it * as more balance. */ - public boolean isMoreBalanced(long srcBeId, long destBeId, long tabletId, long tabletSize) { + public boolean isMoreBalanced(long srcBeId, long destBeId, long tabletId, long tabletSize, + TStorageMedium medium) { double currentSrcBeScore; double currentDestBeScore; @@ -161,33 +189,42 @@ public boolean isMoreBalanced(long srcBeId, long destBeId, long tabletId, long t return false; } - currentSrcBeScore = srcBeStat.getLoadScore(); - currentDestBeScore = destBeStat.getLoadScore(); + if (!srcBeStat.hasMedium(medium) || !destBeStat.hasMedium(medium)) { + return false; + } + + currentSrcBeScore = srcBeStat.getLoadScore(medium); + currentDestBeScore = destBeStat.getLoadScore(medium); - LoadScore newSrcBeScore = BackendLoadStatistic.calcSore(srcBeStat.getTotalUsedCapacityB() - tabletSize, - srcBeStat.getTotalCapacityB(), srcBeStat.getReplicaNum() - 1, - avgUsedCapacityPercent, avgReplicaNumPercent); + LoadScore newSrcBeScore = BackendLoadStatistic.calcSore(srcBeStat.getTotalUsedCapacityB(medium) - tabletSize, + srcBeStat.getTotalCapacityB(medium), srcBeStat.getReplicaNum(medium) - 1, + avgUsedCapacityPercentMap.get(medium), avgReplicaNumPercentMap.get(medium)); - LoadScore newDestBeScore = BackendLoadStatistic.calcSore(destBeStat.getTotalUsedCapacityB() + tabletSize, - destBeStat.getTotalCapacityB(), destBeStat.getReplicaNum() + 1, - avgUsedCapacityPercent, avgReplicaNumPercent); + LoadScore newDestBeScore = BackendLoadStatistic.calcSore(destBeStat.getTotalUsedCapacityB(medium) + tabletSize, + destBeStat.getTotalCapacityB(medium), destBeStat.getReplicaNum(medium) + 1, + avgUsedCapacityPercentMap.get(medium), avgReplicaNumPercentMap.get(medium)); - double currentDiff = Math.abs(currentSrcBeScore - avgLoadScore) + Math.abs(currentDestBeScore - avgLoadScore); - double newDiff = Math.abs(newSrcBeScore.score - avgLoadScore) + Math.abs(newDestBeScore.score - avgLoadScore); + double currentDiff = Math.abs(currentSrcBeScore - avgLoadScoreMap.get(medium)) + Math.abs(currentDestBeScore - avgLoadScoreMap.get(medium)); + double newDiff = Math.abs(newSrcBeScore.score - avgLoadScoreMap.get(medium)) + Math.abs(newDestBeScore.score - avgLoadScoreMap.get(medium)); - LOG.debug("after migrate {}(size: {}) from {} to {}, the load score changed." - + "src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {}", - tabletId, tabletSize, srcBeId, destBeId, currentSrcBeScore, newSrcBeScore.score, - currentDestBeScore, newDestBeScore.score, avgLoadScore, currentDiff, newDiff); + LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed." + + " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {}," + + " more balanced: {}", + tabletId, tabletSize, srcBeId, destBeId, medium, currentSrcBeScore, newSrcBeScore.score, + currentDestBeScore, newDestBeScore.score, avgLoadScoreMap.get(medium), currentDiff, newDiff, + (newDiff < currentDiff)); return newDiff < currentDiff; } - public List> getClusterStatistic() { + public List> getClusterStatistic(TStorageMedium medium) { List> statistics = Lists.newArrayList(); for (BackendLoadStatistic beStatistic : beLoadStatistics) { - List beStat = beStatistic.getInfo(); + if (!beStatistic.hasMedium(medium)) { + continue; + } + List beStat = beStatistic.getInfo(medium); statistics.add(beStat); } @@ -235,15 +272,23 @@ public BackendLoadStatistic getBackendLoadStatistic(long beId) { public void getBackendStatisticByClass( List low, List mid, - List high) { + List high, + TStorageMedium medium) { for (BackendLoadStatistic beStat : beLoadStatistics) { - if (beStat.getClazz() == Classification.LOW) { - low.add(beStat); - } else if (beStat.getClazz() == Classification.HIGH) { - high.add(beStat); - } else { - mid.add(beStat); + Classification clazz = beStat.getClazz(medium); + switch (clazz) { + case LOW: + low.add(beStat); + break; + case MID: + mid.add(beStat); + break; + case HIGH: + high.add(beStat); + break; + default: + break; } } @@ -260,16 +305,24 @@ public void getBackendStatisticByClass( mid.clear(); } - Collections.sort(low); - Collections.sort(mid); - Collections.sort(high); + sortBeStats(low, medium); + sortBeStats(mid, medium); + sortBeStats(high, medium); - LOG.debug("after adjust, cluster {} backend classification low/mid/high: {}/{}/{}", - clusterName, low.size(), mid.size(), high.size()); + LOG.debug("after adjust, cluster {} backend classification low/mid/high: {}/{}/{}, medium: {}", + clusterName, low.size(), mid.size(), high.size(), medium); } - public List getBeLoadStatistics() { - return beLoadStatistics; + public List getSortedBeLoadStats(TStorageMedium medium) { + if (medium != null) { + List beStatsWithMedium = beLoadStatistics.stream().filter( + b -> b.hasMedium(medium)).collect(Collectors.toList()); + sortBeStats(beStatsWithMedium, medium); + return beStatsWithMedium; + } else { + // be stats are already sorted by mix load score in init() + return beLoadStatistics; + } } public String getBrief() { diff --git a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java index c597e436836f7f..9c0819238d3ad7 100644 --- a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java @@ -25,6 +25,7 @@ import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -57,7 +58,10 @@ public LoadBalancer(Map statisticMap) { public List selectAlternativeTablets() { List alternativeTablets = Lists.newArrayList(); for (Map.Entry entry : statisticMap.entrySet()) { - alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getKey(), entry.getValue())); + for (TStorageMedium medium : TStorageMedium.values()) { + alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getKey(), + entry.getValue(), medium)); + } } return alternativeTablets; } @@ -75,39 +79,38 @@ public List selectAlternativeTablets() { * when this tablet is being scheduled in tablet scheduler. */ private List selectAlternativeTabletsForCluster( - String clusterName, ClusterLoadStatistic clusterStat) { - // tablet id -> backend id -> path hash + String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) { List alternativeTablets = Lists.newArrayList(); // get classification of backends List lowBEs = Lists.newArrayList(); List midBEs = Lists.newArrayList(); List highBEs = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs); + clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); if (lowBEs.isEmpty() && highBEs.isEmpty()) { - LOG.info("cluster is balance: {}. skip", clusterName); + LOG.info("cluster is balance: {} with medium: {}. skip", clusterName, medium); return alternativeTablets; } // first we should check if low backends is available. // if all low backends is not available, we should not start balance if (lowBEs.stream().allMatch(b -> !b.isAvailable())) { - LOG.info("all low load backends is dead: {}. skip", - lowBEs.stream().mapToLong(b -> b.getBeId()).toArray()); + LOG.info("all low load backends is dead: {} with medium: {}. skip", + lowBEs.stream().mapToLong(b -> b.getBeId()).toArray(), medium); return alternativeTablets; } if (lowBEs.stream().allMatch(b -> !b.hasAvailDisk())) { - LOG.info("all low load backends have no available disk. skip", - lowBEs.stream().mapToLong(b -> b.getBeId()).toArray()); + LOG.info("all low load backends have no available disk with medium: {}. skip", + lowBEs.stream().mapToLong(b -> b.getBeId()).toArray(), medium); return alternativeTablets; } // get the number of low load paths. and we should at most select this number of tablets long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && b.hasAvailDisk()).mapToLong( - b -> b.getAvailPathNum()).sum(); - LOG.info("get number of low load paths: {}", numOfLowPaths); + b -> b.getAvailPathNum(medium)).sum(); + LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, @@ -119,12 +122,12 @@ private List selectAlternativeTabletsForCluster( Set pathLow = Sets.newHashSet(); Set pathMid = Sets.newHashSet(); Set pathHigh = Sets.newHashSet(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, null); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); // we only select tablets from available mid and high load path pathHigh.addAll(pathMid); // get all tablets on this backend, and shuffle them for random selection - List tabletIds = invertedIndex.getTabletIdsByBackendId(beStat.getBeId()); + List tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium); Collections.shuffle(tabletIds); // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets @@ -177,8 +180,8 @@ private List selectAlternativeTabletsForCluster( } } // end for high backends - LOG.info("select alternative tablets for cluster: {}, num: {}, detail: {}", - clusterName, alternativeTablets.size(), + LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", + clusterName, medium, alternativeTablets.size(), alternativeTablets.stream().mapToLong(t -> t.getTabletId()).toArray()); return alternativeTablets; } @@ -201,7 +204,7 @@ public void createBalanceTask(TabletSchedCtx tabletCtx, Map back List lowBe = Lists.newArrayList(); List midBe = Lists.newArrayList(); List highBe = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe); + clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe, tabletCtx.getStorageMedium()); if (lowBe.isEmpty() && highBe.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, "cluster is balance"); @@ -253,18 +256,21 @@ public void createBalanceTask(TabletSchedCtx tabletCtx, Map back // no replica on this low load backend // 1. check if this clone task can make the cluster more balance. List availPaths = Lists.newArrayList(); - if (beStat.isFit(tabletCtx.getTabletSize(), availPaths, - false /* not supplement */) != BalanceStatus.OK) { + BalanceStatus bs; + if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths, + false /* not supplement */)) != BalanceStatus.OK) { + LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs()); continue; } if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(), - tabletCtx.getTabletId(), tabletCtx.getTabletSize())) { + tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { continue; } PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); if (slot == null) { + LOG.debug("BE does not have slot: {}", beStat.getBeId()); continue; } @@ -278,6 +284,7 @@ public void createBalanceTask(TabletSchedCtx tabletCtx, Map back long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow); if (pathHash == -1) { + LOG.debug("paths has no available balance slot: {}", pathLow); continue; } else { tabletCtx.setDest(beStat.getBeId(), pathHash); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index ca4204ebb1cd24..ab635844593ed8 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -241,6 +241,10 @@ public void increaseFailedSchedCounter() { ++failedSchedCounter; } + public int getFailedSchedCounter() { + return failedSchedCounter; + } + public void increaseFailedRunningCounter() { ++failedRunningCounter; } @@ -936,6 +940,7 @@ public List getBrief() { List result = Lists.newArrayList(); result.add(String.valueOf(tabletId)); result.add(type.name()); + result.add(storageMedium == null ? "N/A" : storageMedium.name()); result.add(tabletStatus == null ? "N/A" : tabletStatus.name()); result.add(state.name()); result.add(origPriority.name()); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 08b50d3e032031..c274a016c60aff 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -97,7 +97,7 @@ public class TabletScheduler extends Daemon { public static final int MAX_SCHEDULING_TABLETS = 5000; // if the number of balancing tablets in TabletScheduler exceed this threshold, // no more balance check - public static final int MAX_BALANCING_TABLETS = 500; + public static final int MAX_BALANCING_TABLETS = 100; /* * Tablet is added to pendingTablets as well it's id in allTabletIds. @@ -294,15 +294,17 @@ private void updateClusterLoadStatisticsAndPriorityIfNecessary() { * because we already limit the total number of running clone jobs in cluster by 'backend slots' */ private void updateClusterLoadStatistic() { - statisticMap.clear(); - List clusterNames = infoService.getClusterNames(); + Map newStatisticMap = Maps.newConcurrentMap(); + Set clusterNames = infoService.getClusterNames(); for (String clusterName : clusterNames) { - ClusterLoadStatistic clusterLoadStatistic = new ClusterLoadStatistic(clusterName, catalog, + ClusterLoadStatistic clusterLoadStatistic = new ClusterLoadStatistic(clusterName, infoService, invertedIndex); clusterLoadStatistic.init(); - statisticMap.put(clusterName, clusterLoadStatistic); + newStatisticMap.put(clusterName, clusterLoadStatistic); LOG.info("update cluster {} load statistic:\n{}", clusterName, clusterLoadStatistic.getBrief()); } + + this.statisticMap = newStatisticMap; } public Map getStatisticMap() { @@ -354,10 +356,18 @@ private void schedulePendingTablets() { tabletCtx.setErrMsg(e.getMessage()); if (e.getStatus() == Status.SCHEDULE_FAILED) { - // if balance is disabled, remove this tablet - if (tabletCtx.getType() == Type.BALANCE && Config.disable_balance) { - finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, - "disable balance and " + e.getMessage()); + if (tabletCtx.getType() == Type.BALANCE) { + // if balance is disabled, remove this tablet + if (Config.disable_balance) { + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, + "disable balance and " + e.getMessage()); + } else { + // remove the balance task if it fails to be scheduled many times + if (tabletCtx.getFailedSchedCounter() > 10) { + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, + "schedule failed too many times and " + e.getMessage()); + } + } } else { // we must release resource it current hold, and be scheduled again tabletCtx.releaseResource(this); @@ -702,8 +712,8 @@ private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx) { if (beStatistic == null) { continue; } - if (beStatistic.getLoadScore() > maxScore) { - maxScore = beStatistic.getLoadScore(); + if (beStatistic.getLoadScore(tabletCtx.getStorageMedium()) > maxScore) { + maxScore = beStatistic.getLoadScore(tabletCtx.getStorageMedium()); chosenReplica = replica; } } @@ -786,10 +796,10 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx) if (statistic == null) { throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); } - List beStatistics = statistic.getBeLoadStatistics(); + List beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */); // get all available paths which this tablet can fit in. - // beStatistics is sorted by load score in ascend order, so select from first to last. + // beStatistics is sorted by mix load score in ascend order, so select from first to last. List allFitPaths = Lists.newArrayList(); for (int i = 0; i < beStatistics.size(); i++) { BackendLoadStatistic bes = beStatistics.get(i); @@ -799,7 +809,8 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx) } List resultPaths = Lists.newArrayList(); - BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), resultPaths, true /* is supplement */); + BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), + resultPaths, true /* is supplement */); if (!st.ok()) { LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st); continue; diff --git a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java index 52c75e2f0399a6..8331b226f52e06 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java @@ -47,7 +47,6 @@ public ProcResult fetchResult() throws AnalysisException { Preconditions.checkNotNull(backend); BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES); for (Map.Entry entry : backend.getDisks().entrySet()) { diff --git a/fe/src/main/java/org/apache/doris/common/proc/ClusterBalanceProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/ClusterBalanceProcDir.java index 2cb7c26e1a9389..6af20d43e8df39 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/ClusterBalanceProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/ClusterBalanceProcDir.java @@ -50,7 +50,7 @@ public boolean register(String name, ProcNodeInterface node) { @Override public ProcNodeInterface lookup(String name) throws AnalysisException { if (name.equals(CLUSTER_LOAD)) { - return new ClusterLoadStatisticProcDir(); + return new ClusterLoadStatByMedium(); } else if (name.equals(WORKING_SLOTS)) { return new SchedulerWorkingSlotsProcDir(); } else if (name.equals(SCHED_STAT)) { diff --git a/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByMedium.java b/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByMedium.java new file mode 100644 index 00000000000000..d8577705215fe5 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByMedium.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/* + * Author: Chenmingyu + * Date: Mar 7, 2019 + */ + +public class ClusterLoadStatByMedium implements ProcDirInterface { + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add( + "StorageMedium").build(); + + @Override + public ProcResult fetchResult() throws AnalysisException { + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + for (TStorageMedium medium : TStorageMedium.values()) { + result.addRow(Lists.newArrayList(medium.name())); + } + return result; + } + + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String name) throws AnalysisException { + for (TStorageMedium medium : TStorageMedium.values()) { + if (name.equalsIgnoreCase(medium.name())) { + return new ClusterLoadStatisticProcDir(medium); + } + } + throw new AnalysisException("no such storage medium: " + name); + } + +} diff --git a/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java index 41193b0dc7fc92..47121af60833ba 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java @@ -21,6 +21,7 @@ import org.apache.doris.clone.ClusterLoadStatistic; import org.apache.doris.common.AnalysisException; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableList; @@ -36,6 +37,11 @@ public class ClusterLoadStatisticProcDir implements ProcDirInterface { .build(); private Map statMap; + private TStorageMedium medium; + + public ClusterLoadStatisticProcDir(TStorageMedium medium) { + this.medium = medium; + } @Override public ProcResult fetchResult() throws AnalysisException { @@ -45,7 +51,7 @@ public ProcResult fetchResult() throws AnalysisException { statMap = Catalog.getCurrentCatalog().getTabletScheduler().getStatisticMap(); statMap.values().stream().forEach(t -> { - List> statistics = t.getClusterStatistic(); + List> statistics = t.getClusterStatistic(medium); statistics.stream().forEach(v -> { result.addRow(v); }); diff --git a/fe/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java index 1dcf5b99bc88f7..242af4edc2dbef 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java @@ -35,7 +35,7 @@ */ public class TabletSchedulerDetailProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("TabletId").add("Type").add("Status").add("State").add("OrigPrio").add("DynmPrio") + .add("TabletId").add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio") .add("SrcBe").add("SrcPath").add("DestBe").add("DestPath").add("Timeout") .add("Create").add("LstSched").add("LstVisit").add("Finished").add("Rate").add("FailedSched") .add("FailedRunning").add("LstAdjPrio").add("VisibleVer").add("VisibleVerHash") diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index f7c6f3780a3beb..34fe15ca5a50fb 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -1087,9 +1087,9 @@ public long getBackendIdByHost(String host) { return selectedBackends.get(0).getId(); } - public List getClusterNames() { + public Set getClusterNames() { ImmutableMap idToBackend = idToBackendRef.get(); - List clusterNames = Lists.newArrayList(); + Set clusterNames = Sets.newHashSet(); for (Backend backend : idToBackend.values()) { if (!Strings.isNullOrEmpty(backend.getOwnerClusterName())) { clusterNames.add(backend.getOwnerClusterName()); diff --git a/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java b/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java index 87dbb03ff9c357..7de0f64d729f78 100644 --- a/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java +++ b/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java @@ -237,7 +237,7 @@ public static Database mockDb() throws AnalysisException { Tablet tablet0 = new Tablet(TEST_TABLET0_ID); TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID, TEST_SINGLE_PARTITION_ID, - TEST_TBL_ID, SCHEMA_HASH); + TEST_TBL_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndex.addTablet(tablet0, tabletMeta); Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -308,7 +308,7 @@ public static Database mockDb() throws AnalysisException { Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID); TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, - TEST_TBL2_ID, SCHEMA_HASH); + TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1); Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -320,7 +320,7 @@ public static Database mockDb() throws AnalysisException { Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID); TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION2_ID, - TEST_TBL2_ID, SCHEMA_HASH); + TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2); Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -340,7 +340,8 @@ public static Database mockDb() throws AnalysisException { MaterializedIndex rollupIndexP1 = new MaterializedIndex(TEST_ROLLUP_ID, IndexState.NORMAL); Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID); TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, - TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH); + TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH, + TStorageMedium.HDD); rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1); Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -356,7 +357,8 @@ public static Database mockDb() throws AnalysisException { MaterializedIndex rollupIndexP2 = new MaterializedIndex(TEST_ROLLUP_ID, IndexState.NORMAL); Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID); TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, - TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH); + TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH, + TStorageMedium.HDD); rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2); Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 61b8f06c78e4c1..e078a6ace1f8f9 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -26,6 +26,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -171,7 +172,7 @@ public static Database createSimpleDb(long dbId, long tableId, long partitionId, // index MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD); index.addTablet(tablet, tabletMeta); tablet.addReplica(replica1); diff --git a/fe/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/src/test/java/org/apache/doris/catalog/TabletTest.java index 576c995dc797cb..3eedcee4ecc901 100644 --- a/fe/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.FeConstants; +import org.apache.doris.thrift.TStorageMedium; import org.easymock.EasyMock; import org.junit.Assert; @@ -59,7 +60,7 @@ public void makeTablet() { PowerMock.replay(Catalog.class); tablet = new Tablet(1); - TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1); + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); invertedIndex.addTablet(1, tabletMeta); replica1 = new Replica(1L, 1L, 100L, 0L, 0, 200000L, 3000L, ReplicaState.NORMAL, 0, 0, 0, 0); replica2 = new Replica(2L, 2L, 100L, 0L, 0, 200000L, 3000L, ReplicaState.NORMAL, 0, 0, 0, 0); diff --git a/fe/src/test/java/org/apache/doris/clone/CloneTest.java b/fe/src/test/java/org/apache/doris/clone/CloneTest.java index 1996ab0939bad5..29562806d33d80 100644 --- a/fe/src/test/java/org/apache/doris/clone/CloneTest.java +++ b/fe/src/test/java/org/apache/doris/clone/CloneTest.java @@ -36,6 +36,7 @@ import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTabletInfo; + import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -138,7 +139,7 @@ public void testCheckTimeout() { type, priority, timeoutSecond)); Assert.assertTrue(clone.getCloneTabletIds().contains(tabletId)); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD); Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta); Replica replica = new Replica(); Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica); @@ -183,7 +184,7 @@ public void testCancelCloneJob() { Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId, type, priority, timeoutSecond)); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD); Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta); Replica replica = new Replica(); Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica); @@ -219,7 +220,7 @@ public void testFinishCloneJob() { type, priority, timeoutSecond)); Assert.assertEquals(1, clone.getJobNum()); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD); Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta); Replica replica = new Replica(); Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica); diff --git a/fe/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index b9b655a55937ff..b1b25672813493 100644 --- a/fe/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.TabletMeta; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -126,16 +127,16 @@ public void setUp() { // tablet invertedIndex = new TabletInvertedIndex(); - invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5)); + invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0, ReplicaState.NORMAL)); - invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5)); + invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0, ReplicaState.NORMAL)); - invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5)); + invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0, ReplicaState.NORMAL)); } @@ -143,9 +144,9 @@ public void setUp() { @Test public void test() { ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER, - catalog, systemInfoService, invertedIndex); + systemInfoService, invertedIndex); loadStatistic.init(); - List> infos = loadStatistic.getClusterStatistic(); + List> infos = loadStatistic.getClusterStatistic(TStorageMedium.HDD); System.out.println(infos); Assert.assertEquals(3, infos.size()); } diff --git a/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java b/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java index e006a2aa9e103e..47b1eaad01d508 100644 --- a/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java +++ b/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java @@ -41,6 +41,7 @@ import org.apache.doris.load.Load; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import com.google.common.collect.Maps; @@ -75,7 +76,7 @@ public static Database createDb(long dbId, long tableId, long partitionId, long // index MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD); index.addTablet(tablet, tabletMeta); tablet.addReplica(replica1);