Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,8 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
boolean isRestoreTask = true;
// We don't care the visible version in restore job, the end version is used.
long visibleVersion = -1L;
SnapshotTask task = new SnapshotTask(null, replica.getBackendIdWithoutException(),
long beId = replica.getBackendIdWithoutException();
SnapshotTask task = new SnapshotTask(null, beId,
signature, jobId, db.getId(),
tbl.getId(), part.getId(), index.getId(), tablet.getId(), visibleVersion,
tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, isRestoreTask);
Expand All @@ -1263,7 +1264,7 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
}
batchTask.addTask(task);
unfinishedSignatureToId.put(signature, tablet.getId());
bePathsMap.put(replica.getBackendIdWithoutException(), replica.getPathHash());
bePathsMap.put(beId, replica.getPathHash());
} finally {
tbl.readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public static boolean isGlobalGroupName(String groupName) {
private Multimap<GroupId, Long> group2Tables = ArrayListMultimap.create();
// table_id -> group_id
@SerializedName(value = "table2Group")
private Map<Long, GroupId> table2Group = Maps.newHashMap();
private Map<Long, GroupId> table2Group = Maps.newConcurrentMap();
// group id -> group schema
@SerializedName(value = "group2Schema")
private Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
Expand Down Expand Up @@ -385,6 +385,13 @@ public boolean isGroupUnstable(GroupId groupId) {
}
}

// ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
// If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here
// table2Group is ConcurrentHashMap
public boolean isColocateTableNoLock(long tableId) {
return table2Group.containsKey(tableId);
}

public boolean isColocateTable(long tableId) {
readLock();
try {
Expand Down Expand Up @@ -424,6 +431,14 @@ public Set<GroupId> getUnstableGroupIds() {
}
}

// ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
// If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here
// table2Group is ConcurrentHashMap
public GroupId getGroupNoLock(long tableId) {
Preconditions.checkState(table2Group.containsKey(tableId));
return table2Group.get(tableId);
}

public GroupId getGroup(long tableId) {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private static List<List<String>> getTabletStatus(String dbName, String tblName,
List<String> row = Lists.newArrayList();

ReplicaStatus status = ReplicaStatus.OK;
Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
long beId = replica.getBackendIdWithoutException();
Backend be = infoService.getBackend(beId);
if (be == null || !be.isAlive() || replica.isBad()) {
status = ReplicaStatus.DEAD;
} else if (replica.getVersion() < visibleVersion
Expand All @@ -109,7 +110,7 @@ private static List<List<String>> getTabletStatus(String dbName, String tblName,

row.add(String.valueOf(tabletId));
row.add(String.valueOf(replica.getId()));
row.add(String.valueOf(replica.getBackendIdWithoutException()));
row.add(String.valueOf(beId));
row.add(String.valueOf(replica.getVersion()));
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
Expand Down Expand Up @@ -197,7 +198,8 @@ public static List<List<String>> getTabletStatus(String dbName, String tblName,
List<String> row = Lists.newArrayList();

ReplicaStatus status = ReplicaStatus.OK;
Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
long beId = replica.getBackendIdWithoutException();
Backend be = infoService.getBackend(beId);
if (be == null || !be.isAlive() || replica.isBad()) {
status = ReplicaStatus.DEAD;
} else if (replica.getVersion() < visibleVersion
Expand All @@ -216,7 +218,7 @@ public static List<List<String>> getTabletStatus(String dbName, String tblName,

row.add(String.valueOf(tabletId));
row.add(String.valueOf(replica.getId()));
row.add(String.valueOf(replica.getBackendIdWithoutException()));
row.add(String.valueOf(beId));
row.add(String.valueOf(replica.getVersion()));
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
Expand Down Expand Up @@ -338,13 +340,14 @@ public static List<List<String>> getTabletDistribution(
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if (!countMap.containsKey(replica.getBackendIdWithoutException())) {
long beId = replica.getBackendIdWithoutException();
if (!countMap.containsKey(beId)) {
continue;
}
countMap.put(replica.getBackendIdWithoutException(),
countMap.get(replica.getBackendIdWithoutException()) + 1);
sizeMap.put(replica.getBackendIdWithoutException(),
sizeMap.get(replica.getBackendIdWithoutException()) + replica.getDataSize());
countMap.put(beId,
countMap.get(beId) + 1);
sizeMap.put(beId,
sizeMap.get(beId) + replica.getDataSize());
totalReplicaNum++;
totalReplicaSize += replica.getDataSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
if (replica.getDataSize() == 0) {
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "size of src replica is zero");
}

long beId = replica.getBackendIdWithoutException();
// check src slot
PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException());
PathSlot slot = backendsWorkingSlots.get(beId);
if (slot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BE does not have slot: {}", replica.getBackendIdWithoutException());
LOG.debug("BE does not have slot: {}", beId);
}
throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot");
}
Expand All @@ -329,7 +329,7 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
// after take src slot, we can set src replica now
tabletCtx.setSrc(replica);

BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendIdWithoutException());
BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(beId);
if (!beStat.isAvailable()) {
throw new SchedException(Status.UNRECOVERABLE, "the backend is not available");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
// Check src replica's validation
Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe);
Preconditions.checkNotNull(srcReplica);
TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException());
long beId = srcReplica.getBackendIdWithoutException();
TabletScheduler.PathSlot slot = backendsWorkingSlots.get(beId);
Preconditions.checkNotNull(slot, "unable to get fromBe "
+ srcReplica.getBackendIdWithoutException() + " slot");
+ beId + " slot");
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
Expand Down
42 changes: 24 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,27 +511,28 @@ public boolean filterDestBE(long beId) {
}
String host = backend.getHost();
for (Replica replica : tablet.getReplicas()) {
Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
long replicaBeId = replica.getBackendIdWithoutException();
Backend be = infoService.getBackend(beId);
if (be == null) {
// BE has been dropped, skip it
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist, skip. tablet: {}",
replica.getBackendIdWithoutException(), tabletId);
replicaBeId, tabletId);
}
continue;
}
if (!Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && host.equals(be.getHost())) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is on same host {}, skip. tablet: {}",
replica.getBackendIdWithoutException(), host, tabletId);
replicaBeId, host, tabletId);
}
return true;
}

if (replica.getBackendIdWithoutException() == beId) {
if (replicaBeId == beId) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is same as dest backend {}, skip. tablet: {}",
replica.getBackendIdWithoutException(), beId, tabletId);
replicaBeId, beId, tabletId);
}
return true;
}
Expand Down Expand Up @@ -587,10 +588,11 @@ public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, long exce
*/
List<Replica> candidates = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (exceptBeId != -1 && replica.getBackendIdWithoutException() == exceptBeId) {
long replicaBeId = replica.getBackendIdWithoutException();
if (exceptBeId != -1 && replicaBeId == exceptBeId) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is same as except backend {}, skip. tablet: {}",
replica.getBackendIdWithoutException(), exceptBeId, tabletId);
replicaBeId, exceptBeId, tabletId);
}
continue;
}
Expand All @@ -603,12 +605,12 @@ public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, long exce
continue;
}

Backend be = infoService.getBackend(replica.getBackendIdWithoutException());
Backend be = infoService.getBackend(replicaBeId);
if (be == null || !be.isAlive()) {
// backend which is in decommission can still be the source backend
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist or is not alive, skip. tablet: {}",
replica.getBackendIdWithoutException(), tabletId);
replicaBeId, tabletId);
}
continue;
}
Expand Down Expand Up @@ -640,11 +642,12 @@ public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, long exce
// sort replica by version count asc, so that we prefer to choose replicas with fewer versions
Collections.sort(candidates, VERSION_COUNTER_COMPARATOR);
for (Replica srcReplica : candidates) {
PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException());
long replicaBeId = srcReplica.getBackendIdWithoutException();
PathSlot slot = backendsWorkingSlots.get(replicaBeId);
if (slot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not have working slot, skip. tablet: {}",
srcReplica.getBackendIdWithoutException(), tabletId);
replicaBeId, tabletId);
}
continue;
}
Expand All @@ -653,7 +656,7 @@ public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, long exce
if (srcPathHash == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not have available slot, skip. tablet: {}",
srcReplica.getBackendIdWithoutException(), tabletId);
replicaBeId, tabletId);
}
continue;
}
Expand Down Expand Up @@ -701,10 +704,11 @@ public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWo
}

if (!replica.isScheduleAvailable()) {
if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replica.getBackendIdWithoutException())) {
long replicaBeId = replica.getBackendIdWithoutException();
if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replicaBeId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist or is not scheduler available, skip. tablet: {}",
replica.getBackendIdWithoutException(), tabletId);
replicaBeId, tabletId);
}
} else {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -816,6 +820,7 @@ public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWo
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"unable to take slot of dest path");
}
long chosenReplicaBeId = chosenReplica.getBackendIdWithoutException();

if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of VERSION_INCOMPLETE,
Expand All @@ -838,9 +843,9 @@ public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWo
setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete,"
+ " and change state from DECOMMISSION to NORMAL",
chosenReplica.getId(), chosenReplica.getBackendIdWithoutException(), tabletId);
chosenReplica.getId(), chosenReplicaBeId, tabletId);
}
setDest(chosenReplica.getBackendIdWithoutException(), chosenReplica.getPathHash());
setDest(chosenReplicaBeId, chosenReplica.getPathHash());
}

private boolean checkFurtherRepairFinish(Replica replica, long version) {
Expand Down Expand Up @@ -974,10 +979,11 @@ public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedE

// database lock should be held.
public CloneTask createCloneReplicaAndTask() throws SchedException {
Backend srcBe = infoService.getBackend(srcReplica.getBackendIdWithoutException());
long beId = srcReplica.getBackendIdWithoutException();
Backend srcBe = infoService.getBackend(beId);
if (srcBe == null) {
throw new SchedException(Status.SCHEDULE_FAILED,
"src backend " + srcReplica.getBackendIdWithoutException() + " does not exist");
"src backend " + beId + " does not exist");
}

Backend destBe = infoService.getBackend(destBackendId);
Expand Down
Loading
Loading