diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index 04bd959eb4d9..37984848e9ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -85,6 +86,7 @@ public class DiskBalancerService extends BackgroundService { LoggerFactory.getLogger(DiskBalancerService.class); public static final String DISK_BALANCER_DIR = "diskBalancer"; + private static long replicaDeletionDelayMills = 60 * 60 * 1000L; // 60 minutes private OzoneContainer ozoneContainer; private final ConfigurationSource conf; @@ -104,6 +106,7 @@ public class DiskBalancerService extends BackgroundService { private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow()); private Set inProgressContainers; + private ConcurrentSkipListMap pendingDeletionContainers = new ConcurrentSkipListMap(); private static FaultInjector injector; /** @@ -339,12 +342,14 @@ public BackgroundTaskQueue getTasks() { if (this.operationalState == DiskBalancerRunningStatus.STOPPED || this.operationalState == DiskBalancerRunningStatus.PAUSED) { + cleanupPendingDeletionContainers(); return queue; } metrics.incrRunningLoopCount(); if (shouldDelay()) { metrics.incrIdleLoopExceedsBandwidthCount(); + cleanupPendingDeletionContainers(); return queue; } @@ -391,6 +396,7 @@ public BackgroundTaskQueue getTasks() { } } metrics.incrIdleLoopNoAvailableVolumePairCount(); + cleanupPendingDeletionContainers(); } return queue; @@ -554,20 +560,15 @@ public BackgroundTaskResult call() { container.readUnlock(); } if (moveSucceeded) { - // Remove the old container from the KeyValueContainerUtil. - try { - KeyValueContainerUtil.removeContainer( - (KeyValueContainerData) container.getContainerData(), conf); - container.delete(); - container.getContainerData().getVolume().decrementUsedSpace(containerSize); - } catch (IOException ex) { - LOG.warn("Failed to move or delete old container {} after it's marked as DELETED. " + - "It will be handled by background scanners.", containerId, ex); - } + // Add current old container to pendingDeletionContainers. + pendingDeletionContainers.put(System.currentTimeMillis() + replicaDeletionDelayMills, container); ContainerLogger.logMoveSuccess(containerId, sourceVolume, destVolume, containerSize, Time.monotonicNow() - startTime); } postCall(moveSucceeded, startTime); + + // pick one expired container from pendingDeletionContainers to delete + tryCleanupOnePendingDeletionContainer(); } return BackgroundTaskResult.EmptyTaskResult.newResult(); } @@ -593,6 +594,43 @@ private void postCall(boolean success, long startTime) { } } + private void deleteContainer(Container container) { + try { + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + KeyValueContainerUtil.removeContainer(containerData, conf); + container.delete(); + container.getContainerData().getVolume().decrementUsedSpace(containerData.getBytesUsed()); + LOG.info("Deleted expired container {} after delay {} ms.", + containerData.getContainerID(), replicaDeletionDelayMills); + } catch (IOException ex) { + LOG.warn("Failed to delete old container {} after it's marked as DELETED. " + + "It will be handled by background scanners.", container.getContainerData().getContainerID(), ex); + } + } + + private void cleanupPendingDeletionContainers() { + // delete all pending deletion containers before stop the service + boolean ret; + do { + ret = tryCleanupOnePendingDeletionContainer(); + } while (ret); + } + + private boolean tryCleanupOnePendingDeletionContainer() { + Map.Entry entry = pendingDeletionContainers.pollFirstEntry(); + if (entry != null) { + if (entry.getKey() <= System.currentTimeMillis()) { + // entry container is expired + deleteContainer(entry.getValue()); + return true; + } else { + // put back the container + pendingDeletionContainers.put(entry.getKey(), entry.getValue()); + } + } + return false; + } + public DiskBalancerInfo getDiskBalancerInfo() { ImmutableList immutableVolumeSet = DiskBalancerVolumeCalculation.getImmutableVolumeSet(volumeSet); @@ -737,4 +775,9 @@ public void shutdown() { public static void setInjector(FaultInjector instance) { injector = instance; } + + @VisibleForTesting + public static void setReplicaDeletionDelayMills(long durationMills) { + replicaDeletionDelayMills = durationMills; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index 60fb52eba10b..76a3992658b1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded; +import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DISK_BALANCER_DIR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -243,7 +244,7 @@ public void setup() throws Exception { conf.setFromObject(diskBalancerConfiguration); diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer, 100, conf, 1); - + DiskBalancerService.setReplicaDeletionDelayMills(0); KeyValueContainer.setInjector(kvFaultInjector); } @@ -320,7 +321,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo) // verify temp container directory doesn't exist before task execution Path tempContainerDir = destVolume.getTmpDir().toPath() - .resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID)); + .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID)); File dir = new File(String.valueOf(tempContainerDir)); assertFalse(dir.exists(), "Temp container directory should not exist before task starts"); @@ -370,7 +371,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo) 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); String oldContainerPath = container.getContainerData().getContainerPath(); Path tempDir = destVolume.getTmpDir().toPath() - .resolve(DiskBalancerService.DISK_BALANCER_DIR) + .resolve(DISK_BALANCER_DIR) .resolve(String.valueOf(CONTAINER_ID)); assertFalse(Files.exists(tempDir), "Temp container directory should not exist"); Path destDirPath = Paths.get( @@ -573,6 +574,34 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } + @ContainerTestVersionInfo.ContainerTest + public void testOldReplicaDelayedDeletion(ContainerTestVersionInfo versionInfo) + throws IOException, InterruptedException { + setLayoutAndSchemaForTest(versionInfo); + long delay = 2000L; // 2 second delay + DiskBalancerService.setReplicaDeletionDelayMills(delay); + + Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); + KeyValueContainerData keyValueContainerData = (KeyValueContainerData) container.getContainerData(); + File oldContainerDir = new File(keyValueContainerData.getContainerPath()); + assertTrue(oldContainerDir.exists()); + + DiskBalancerService.DiskBalancerTask task = getTask(); + task.call(); + assertEquals(State.DELETED, container.getContainerState()); + // Verify that the old container is not deleted immediately + assertTrue(oldContainerDir.exists()); + + // create another container to trigger the deletion of old replicas + createContainer(CONTAINER_ID + 1, sourceVolume, State.CLOSED); + task = getTask(); + // Wait for the delay to pass + Thread.sleep(delay); + task.call(); + // Verify that the old container is deleted + assertFalse(oldContainerDir.exists()); + } + private KeyValueContainer createContainer(long containerId, HddsVolume vol, State state) throws IOException { KeyValueContainerData containerData = new KeyValueContainerData(