diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index ae8aa282679479..eb876f55394a89 100755 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -953,7 +953,7 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { } _build_tablet_info(tablet, tablet_info); - LOG(INFO) << "success to process report tablet info."; + VLOG(10) << "success to process report tablet info."; return res; } // report_tablet_info @@ -987,13 +987,6 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map* tablet_ptr->schema_hash(), tablet_ptr->tablet_uid(), &transaction_ids); tablet_info.__set_transaction_ids(transaction_ids); - if (_available_storage_medium_type_count > 1) { - tablet_info.__set_storage_medium(tablet_ptr->data_dir()->storage_medium()); - } - - tablet_info.__set_version_count(tablet_ptr->version_count()); - tablet_info.__set_path_hash(tablet_ptr->data_dir()->path_hash()); - tablet.tablet_infos.push_back(tablet_info); } @@ -1175,6 +1168,11 @@ void TabletManager::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tabl tablet_info->version = version.second; tablet_info->version_hash = v_hash; tablet_info->__set_partition_id(tablet->partition_id()); + if (_available_storage_medium_type_count > 1) { + tablet_info->__set_storage_medium(tablet->data_dir()->storage_medium()); + } + tablet_info->__set_version_count(tablet->version_count()); + tablet_info->__set_path_hash(tablet->data_dir()->path_hash()); } void TabletManager::_build_tablet_stat() { diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/src/main/java/org/apache/doris/alter/RollupJob.java index bd39a219e739ce..b135eea2ebe226 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJob.java @@ -595,6 +595,10 @@ public synchronized void handleFinishedReplica(AgentTask task, TTabletInfo finis // the version is not set now rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount); + if (finishTabletInfo.isSetPath_hash()) { + rollupReplica.setPathHash(finishTabletInfo.getPath_hash()); + } + setReplicaFinished(partitionId, rollupReplicaId); rollupReplica.setState(ReplicaState.NORMAL); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index adcb2f0780312a..b7b7152e89216f 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -621,6 +621,9 @@ public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long rowCount = finishTabletInfo.getRow_count(); // do not need check version > replica.getVersion, because the new replica's version is first set by sc replica.updateVersionInfo(version, versionHash, dataSize, rowCount); + if (finishTabletInfo.isSetPath_hash()) { + replica.setPathHash(finishTabletInfo.getPath_hash()); + } } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 4f78e53ff9f4c7..4c545a7e08699b 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -67,8 +67,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.Table.Cell; @@ -755,7 +757,7 @@ private void checkAndPrepareMeta() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - Map pathBeMap = Maps.newHashMap(); + Multimap bePathsMap = HashMultimap.create(); batchTask = new AgentBatchTask(); db.readLock(); try { @@ -774,14 +776,14 @@ private void checkAndPrepareMeta() { true /* is restore task*/); batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); - pathBeMap.put(replica.getPathHash(), replica.getBackendId()); + bePathsMap.put(replica.getBackendId(), replica.getPathHash()); } } finally { db.readUnlock(); } // check disk capacity - org.apache.doris.common.Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(pathBeMap, true); + org.apache.doris.common.Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(bePathsMap, true); if (!st.ok()) { status = new Status(ErrCode.COMMON_ERROR, st.getErrorMsg()); return; diff --git a/fe/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/src/main/java/org/apache/doris/catalog/Tablet.java index fa4fa8c497c6ee..2753e7d539e94a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/src/main/java/org/apache/doris/catalog/Tablet.java @@ -26,8 +26,9 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -40,7 +41,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; /** @@ -186,9 +186,9 @@ public List getNormalReplicaBackendIds() { return beIds; } - // return map of (path hash -> BE id) of normal replicas - public Map getNormalReplicaBackendPathMap() { - Map map = Maps.newHashMap(); + // return map of (BE id -> path hash) of normal replicas + public Multimap getNormalReplicaBackendPathMap() { + Multimap map = HashMultimap.create(); SystemInfoService infoService = Catalog.getCurrentSystemInfo(); for (Replica replica : replicas) { if (replica.isBad()) { @@ -198,7 +198,7 @@ public Map getNormalReplicaBackendPathMap() { ReplicaState state = replica.getState(); if (infoService.checkBackendAlive(replica.getBackendId()) && (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE)) { - map.put(replica.getPathHash(), replica.getBackendId()); + map.put(replica.getBackendId(), replica.getPathHash()); } } return map; diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index e2fac5d6946d00..77166eb51d22b5 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -876,6 +876,9 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getVersion_hash(), reportedTablet.getData_size(), reportedTablet.getRow_count()); + if (reportedTablet.isSetPath_hash()) { + replica.setPathHash(reportedTablet.getPath_hash()); + } if (this.type == Type.BALANCE) { long partitionVisibleVersion = partition.getVisibleVersion(); diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index 465da2ac62412b..c1afb3170919aa 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -55,8 +55,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -295,26 +296,27 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr private TOlapTableLocationParam createLocation(OlapTable table) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); - Map allPathBeMap = Maps.newHashMap(); + // BE id -> path hash + Multimap allBePathsMap = HashMultimap.create(); for (Partition partition : table.getPartitions()) { int quorum = table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices()) { // we should ensure the replica backend is alive // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { - Map pathBeMap = tablet.getNormalReplicaBackendPathMap(); - if (pathBeMap.size() < quorum) { - throw new UserException("tablet " + tablet.getId() + " has few replicas: " + pathBeMap.size()); + Multimap bePathsMap = tablet.getNormalReplicaBackendPathMap(); + if (bePathsMap.keySet().size() < quorum) { + throw new UserException("tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size()); } - locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(pathBeMap.values()))); - allPathBeMap.putAll(pathBeMap); + locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + allBePathsMap.putAll(bePathsMap); } } } // check if disk capacity reach limit // this is for load process, so use high water mark to check - Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allPathBeMap, true); + Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allBePathsMap, true); if (!st.ok()) { throw new DdlException(st.getErrorMsg()); } diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index 2202bb76d12214..ca09bf7c382bab 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -36,6 +36,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import org.apache.commons.validator.routines.InetAddressValidator; @@ -1116,19 +1117,21 @@ public Set getClusterNames() { /* * Check if the specified disks' capacity has reached the limit. - * pathBeMap is (path hash -> BE id) + * bePathsMap is (BE id -> list of path hash) * If floodStage is true, it will check with the floodStage threshold. * * return Status.OK if not reach the limit */ - public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean floodStage) { - LOG.debug("pathBeMap: {}", pathBeMap); + public Status checkExceedDiskCapacityLimit(Multimap bePathsMap, boolean floodStage) { + LOG.debug("pathBeMap: {}", bePathsMap); ImmutableMap pathHashToDiskInfo = pathHashToDishInfoRef.get(); - for (Long pathHash : pathBeMap.keySet()) { - DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); - if (diskInfo != null && diskInfo.exceedLimit(floodStage)) { - return new Status(TStatusCode.CANCELLED, - "disk " + pathHash + " on backend " + pathBeMap.get(pathHash) + " exceed limit usage"); + for (Long beId : bePathsMap.keySet()) { + for (Long pathHash : bePathsMap.get(beId)) { + DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); + if (diskInfo != null && diskInfo.exceedLimit(floodStage)) { + return new Status(TStatusCode.CANCELLED, + "disk " + pathHash + " on backend " + beId + " exceed limit usage"); + } } } return Status.OK;