Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class DiskBalancerService extends BackgroundService {
LoggerFactory.getLogger(DiskBalancerService.class);

public static final String DISK_BALANCER_DIR = "diskBalancer";
private static long replicaDeletionDelayMills = 60 * 60 * 1000L; // 60 minutes

private OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
Expand All @@ -104,6 +106,7 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());

private Set<ContainerID> inProgressContainers;
private ConcurrentSkipListMap<Long, Container> pendingDeletionContainers = new ConcurrentSkipListMap();
private static FaultInjector injector;

/**
Expand Down Expand Up @@ -339,12 +342,14 @@ public BackgroundTaskQueue getTasks() {

if (this.operationalState == DiskBalancerRunningStatus.STOPPED ||
this.operationalState == DiskBalancerRunningStatus.PAUSED) {
cleanupPendingDeletionContainers();
return queue;
}
metrics.incrRunningLoopCount();

if (shouldDelay()) {
metrics.incrIdleLoopExceedsBandwidthCount();
cleanupPendingDeletionContainers();
return queue;
}

Expand Down Expand Up @@ -391,6 +396,7 @@ public BackgroundTaskQueue getTasks() {
}
}
metrics.incrIdleLoopNoAvailableVolumePairCount();
cleanupPendingDeletionContainers();
}

return queue;
Expand Down Expand Up @@ -554,20 +560,15 @@ public BackgroundTaskResult call() {
container.readUnlock();
}
if (moveSucceeded) {
// Remove the old container from the KeyValueContainerUtil.
try {
KeyValueContainerUtil.removeContainer(
(KeyValueContainerData) container.getContainerData(), conf);
container.delete();
container.getContainerData().getVolume().decrementUsedSpace(containerSize);
} catch (IOException ex) {
LOG.warn("Failed to move or delete old container {} after it's marked as DELETED. " +
"It will be handled by background scanners.", containerId, ex);
}
// Add current old container to pendingDeletionContainers.
pendingDeletionContainers.put(System.currentTimeMillis() + replicaDeletionDelayMills, container);
ContainerLogger.logMoveSuccess(containerId, sourceVolume,
destVolume, containerSize, Time.monotonicNow() - startTime);
}
postCall(moveSucceeded, startTime);

// pick one expired container from pendingDeletionContainers to delete
tryCleanupOnePendingDeletionContainer();
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
Expand All @@ -593,6 +594,43 @@ private void postCall(boolean success, long startTime) {
}
}

private void deleteContainer(Container container) {
try {
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();
KeyValueContainerUtil.removeContainer(containerData, conf);
container.delete();
container.getContainerData().getVolume().decrementUsedSpace(containerData.getBytesUsed());
LOG.info("Deleted expired container {} after delay {} ms.",
containerData.getContainerID(), replicaDeletionDelayMills);
} catch (IOException ex) {
LOG.warn("Failed to delete old container {} after it's marked as DELETED. " +
"It will be handled by background scanners.", container.getContainerData().getContainerID(), ex);
}
}

private void cleanupPendingDeletionContainers() {
// delete all pending deletion containers before stop the service
boolean ret;
do {
ret = tryCleanupOnePendingDeletionContainer();
} while (ret);
}

private boolean tryCleanupOnePendingDeletionContainer() {
Map.Entry<Long, Container> entry = pendingDeletionContainers.pollFirstEntry();
if (entry != null) {
if (entry.getKey() <= System.currentTimeMillis()) {
// entry container is expired
deleteContainer(entry.getValue());
return true;
} else {
// put back the container
pendingDeletionContainers.put(entry.getKey(), entry.getValue());
}
}
return false;
}

public DiskBalancerInfo getDiskBalancerInfo() {
ImmutableList<HddsVolume> immutableVolumeSet = DiskBalancerVolumeCalculation.getImmutableVolumeSet(volumeSet);

Expand Down Expand Up @@ -737,4 +775,9 @@ public void shutdown() {
public static void setInjector(FaultInjector instance) {
injector = instance;
}

@VisibleForTesting
public static void setReplicaDeletionDelayMills(long durationMills) {
replicaDeletionDelayMills = durationMills;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DISK_BALANCER_DIR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -243,7 +244,7 @@ public void setup() throws Exception {
conf.setFromObject(diskBalancerConfiguration);
diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
100, conf, 1);

DiskBalancerService.setReplicaDeletionDelayMills(0);
KeyValueContainer.setInjector(kvFaultInjector);
}

Expand Down Expand Up @@ -320,7 +321,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo)

// verify temp container directory doesn't exist before task execution
Path tempContainerDir = destVolume.getTmpDir().toPath()
.resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
.resolve(DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
File dir = new File(String.valueOf(tempContainerDir));
assertFalse(dir.exists(), "Temp container directory should not exist before task starts");

Expand Down Expand Up @@ -370,7 +371,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo)
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
Path tempDir = destVolume.getTmpDir().toPath()
.resolve(DiskBalancerService.DISK_BALANCER_DIR)
.resolve(DISK_BALANCER_DIR)
.resolve(String.valueOf(CONTAINER_ID));
assertFalse(Files.exists(tempDir), "Temp container directory should not exist");
Path destDirPath = Paths.get(
Expand Down Expand Up @@ -573,6 +574,34 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
}

@ContainerTestVersionInfo.ContainerTest
public void testOldReplicaDelayedDeletion(ContainerTestVersionInfo versionInfo)
throws IOException, InterruptedException {
setLayoutAndSchemaForTest(versionInfo);
long delay = 2000L; // 2 second delay
DiskBalancerService.setReplicaDeletionDelayMills(delay);

Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED);
KeyValueContainerData keyValueContainerData = (KeyValueContainerData) container.getContainerData();
File oldContainerDir = new File(keyValueContainerData.getContainerPath());
assertTrue(oldContainerDir.exists());

DiskBalancerService.DiskBalancerTask task = getTask();
task.call();
assertEquals(State.DELETED, container.getContainerState());
// Verify that the old container is not deleted immediately
assertTrue(oldContainerDir.exists());

// create another container to trigger the deletion of old replicas
createContainer(CONTAINER_ID + 1, sourceVolume, State.CLOSED);
task = getTask();
// Wait for the delay to pass
Thread.sleep(delay);
task.call();
// Verify that the old container is deleted
assertFalse(oldContainerDir.exists());
}

private KeyValueContainer createContainer(long containerId, HddsVolume vol, State state)
throws IOException {
KeyValueContainerData containerData = new KeyValueContainerData(
Expand Down
Loading