Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,10 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = _env->storage_engine()->execute_task(&engine_task);
}
// fe should ignore this err
if (status.is<FILE_ALREADY_EXIST>()) {
status = Status::OK();
}
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
.tag("signature", agent_task_req.signature)
Expand Down Expand Up @@ -1986,8 +1990,9 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
return Status::InternalError("tablet is already on specified path {}",
tablet->data_dir()->path());
LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
tablet->data_dir()->path());
}

// check local disk capacity
Expand All @@ -1996,7 +2001,6 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}

return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
Set<Long> pathHigh = Sets.newHashSet();
// we only select tablets from available high load path
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
// check if BE has low and high paths for balance after reclassification
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
continue;
}
Expand Down Expand Up @@ -382,5 +381,6 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
if (!setDest) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path");
}
LOG.info("dx test out completeSchedCtx");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -608,6 +609,15 @@ private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws Sched
}
}

public void updateDestPathHash(TabletSchedCtx tabletCtx) {
// find dest replica
Optional<Replica> destReplica = tabletCtx.getReplicas()
.stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny();
if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
destReplica.get().setPathHash(tabletCtx.getDestPathHash());
}
}

public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
PathSlot pathSlot = backendsWorkingSlots.get(beId);
if (pathSlot == null) {
Expand Down Expand Up @@ -1630,6 +1640,7 @@ public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrati
// if we have a success task, then stat must be refreshed before schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
updateDestPathHash(tabletCtx);
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
} else {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
Expand Down