From 22b5d417f429f225208c87b9a91bdc63506032ce Mon Sep 17 00:00:00 2001 From: ashishk Date: Thu, 27 Feb 2025 18:43:02 +0530 Subject: [PATCH 01/12] HDDS-12235. Reserve space for on DN for container import. --- .../common/volume/AvailableSpaceFilter.java | 3 +- .../container/common/volume/HddsVolume.java | 10 +++ .../replication/ContainerImporter.java | 3 + .../DownloadAndImportReplicator.java | 19 +++++- .../SendContainerRequestHandler.java | 65 ++++++++++++------- 5 files changed, 76 insertions(+), 24 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java index 330cc1d9f39a..857c8d5acef6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java @@ -43,7 +43,8 @@ public boolean test(HddsVolume vol) { long volumeCapacity = usage.getCapacity(); long free = usage.getAvailable(); long committed = vol.getCommittedBytes(); - long available = free - committed; + long importContainerCommitBytes = vol.getContainerImportCommittedBytes(); + long available = free - committed - importContainerCommitBytes; long volumeFreeSpaceToSpare = new VolumeUsage.MinFreeSpaceCalculator(vol.getConf()).get(volumeCapacity); boolean hasEnoughSpace = VolumeUsage.hasVolumeEnoughSpace(free, committed, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 70f25fa138dc..265996f6a6c6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -84,6 +84,8 @@ public class HddsVolume extends StorageVolume { private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full + private final AtomicLong containerImportCommittedBytes = new AtomicLong(); + // Mentions the type of volume private final VolumeType type = VolumeType.DATA_VOLUME; // The dedicated DbVolume that the db instance of this HddsVolume resides. @@ -305,6 +307,14 @@ public long getCommittedBytes() { return committedBytes.get(); } + public long incImportContainerCommitBytes(long delta) { + return containerImportCommittedBytes.addAndGet(delta); + } + + public long getContainerImportCommittedBytes() { + return containerImportCommittedBytes.get(); + } + public void setDbVolume(DbVolume dbVolume) { this.dbVolume = dbVolume; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 8e0d301d8578..cf3cfd404bff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -172,4 +172,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) { return new TarContainerPacker(compression); } + public long getDefaultContainerSize() { + return containerSize; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 69f375b06c13..480c541782cd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -22,7 +22,9 @@ import java.nio.file.Path; import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; @@ -44,6 +46,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerDownloader downloader; private final ContainerImporter containerImporter; private final ContainerSet containerSet; + private final long containerSize; public DownloadAndImportReplicator( ConfigurationSource conf, ContainerSet containerSet, @@ -53,6 +56,9 @@ public DownloadAndImportReplicator( this.containerSet = containerSet; this.downloader = downloader; this.containerImporter = containerImporter; + containerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Override @@ -70,9 +76,16 @@ public void replicate(ReplicationTask task) { LOG.info("Starting replication of container {} from {} using {}", containerID, sourceDatanodes, compression); + HddsVolume targetVolume = null; try { - HddsVolume targetVolume = containerImporter.chooseNextVolume(); + targetVolume = containerImporter.chooseNextVolume(); + targetVolume.incImportContainerCommitBytes(containerSize * 2); + if (targetVolume.getCurrentUsage().getAvailable() - targetVolume.getCommittedBytes() + - targetVolume.getContainerImportCommittedBytes() <= 0) { + task.setStatus(Status.FAILED); + return; + } // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = @@ -95,6 +108,10 @@ public void replicate(ReplicationTask task) { } catch (IOException e) { LOG.error("Container {} replication was unsuccessful.", containerID, e); task.setStatus(Status.FAILED); + } finally { + if (targetVolume != null) { + targetVolume.incImportContainerCommitBytes(-containerSize * 2); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index a89e274f9c68..aff33870dd18 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.util.DiskChecker; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -50,7 +51,7 @@ class SendContainerRequestHandler private long containerId = -1; private long nextOffset; private OutputStream output; - private HddsVolume volume; + private HddsVolume volume = null; private Path path; private CopyContainerCompression compression; private final ZeroCopyMessageMarshaller marshaller; @@ -85,6 +86,14 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); volume = importer.chooseNextVolume(); + volume.incImportContainerCommitBytes(importer.getDefaultContainerSize() * 2); + if (volume.getCurrentUsage().getAvailable() - volume.getCommittedBytes() + - volume.getContainerImportCommittedBytes() <= 0) { + volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + volume = null; + throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); + } + Path dir = ContainerImporter.getUntarDirectory(volume); Files.createDirectories(dir); path = dir.resolve(ContainerUtils.getContainerTarName(containerId)); @@ -110,32 +119,44 @@ public void onNext(SendContainerRequest req) { @Override public void onError(Throwable t) { - LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); - closeOutput(); - deleteTarball(); - responseObserver.onError(t); + try { + LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); + closeOutput(); + deleteTarball(); + responseObserver.onError(t); + } finally { + if (volume != null) { + volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + } + } } @Override public void onCompleted() { - if (output == null) { - LOG.warn("Received container without any parts"); - return; - } - - LOG.info("Container {} is downloaded with size {}, starting to import.", - containerId, nextOffset); - closeOutput(); - try { - importer.importContainer(containerId, path, volume, compression); - LOG.info("Container {} is replicated successfully", containerId); - responseObserver.onNext(SendContainerResponse.newBuilder().build()); - responseObserver.onCompleted(); - } catch (Throwable t) { - LOG.warn("Failed to import container {}", containerId, t); - deleteTarball(); - responseObserver.onError(t); + if (output == null) { + LOG.warn("Received container without any parts"); + return; + } + + LOG.info("Container {} is downloaded with size {}, starting to import.", + containerId, nextOffset); + closeOutput(); + + try { + importer.importContainer(containerId, path, volume, compression); + LOG.info("Container {} is replicated successfully", containerId); + responseObserver.onNext(SendContainerResponse.newBuilder().build()); + responseObserver.onCompleted(); + } catch (Throwable t) { + LOG.warn("Failed to import container {}", containerId, t); + deleteTarball(); + responseObserver.onError(t); + } + } finally { + if (volume != null) { + volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + } } } From cdeae20dfee37788fe195739b428b7496d19bb81 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 11:00:47 +0530 Subject: [PATCH 02/12] Fix review comments --- .../common/volume/AvailableSpaceFilter.java | 5 ++-- .../container/common/volume/HddsVolume.java | 8 ------- .../DownloadAndImportReplicator.java | 20 ++++++++++++---- .../SendContainerRequestHandler.java | 24 ++++++++++++++----- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java index 857c8d5acef6..b6356934ca3a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java @@ -33,7 +33,7 @@ public class AvailableSpaceFilter implements Predicate { new HashMap<>(); private long mostAvailableSpace = Long.MIN_VALUE; - AvailableSpaceFilter(long requiredSpace) { + public AvailableSpaceFilter(long requiredSpace) { this.requiredSpace = requiredSpace; } @@ -43,8 +43,7 @@ public boolean test(HddsVolume vol) { long volumeCapacity = usage.getCapacity(); long free = usage.getAvailable(); long committed = vol.getCommittedBytes(); - long importContainerCommitBytes = vol.getContainerImportCommittedBytes(); - long available = free - committed - importContainerCommitBytes; + long available = free - committed; long volumeFreeSpaceToSpare = new VolumeUsage.MinFreeSpaceCalculator(vol.getConf()).get(volumeCapacity); boolean hasEnoughSpace = VolumeUsage.hasVolumeEnoughSpace(free, committed, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 265996f6a6c6..a0259e355e39 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -307,14 +307,6 @@ public long getCommittedBytes() { return committedBytes.get(); } - public long incImportContainerCommitBytes(long delta) { - return containerImportCommittedBytes.addAndGet(delta); - } - - public long getContainerImportCommittedBytes() { - return containerImportCommittedBytes.get(); - } - public void setDbVolume(DbVolume dbVolume) { this.dbVolume = dbVolume; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 480c541782cd..b32a31e6b9ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -20,12 +20,15 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.slf4j.Logger; @@ -80,9 +83,18 @@ public void replicate(ReplicationTask task) { try { targetVolume = containerImporter.chooseNextVolume(); - targetVolume.incImportContainerCommitBytes(containerSize * 2); - if (targetVolume.getCurrentUsage().getAvailable() - targetVolume.getCommittedBytes() - - targetVolume.getContainerImportCommittedBytes() <= 0) { + // Increment committed bytes and verify if it doesn't cross the space left. + targetVolume.incCommittedBytes(containerSize * 2); + // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter + AvailableSpaceFilter filter = new AvailableSpaceFilter(0); + List hddsVolumeList = new ArrayList<>(); + hddsVolumeList.add(targetVolume); + List volumeWithEnoughSpace = hddsVolumeList.stream() + .filter(filter) + .collect(Collectors.toList()); + if (volumeWithEnoughSpace.isEmpty()) { + targetVolume.incCommittedBytes(-containerSize * 2); + LOG.error("Container {} replication was unsuccessful, due to no space left", containerID); task.setStatus(Status.FAILED); return; } @@ -110,7 +122,7 @@ public void replicate(ReplicationTask task) { task.setStatus(Status.FAILED); } finally { if (targetVolume != null) { - targetVolume.incImportContainerCommitBytes(-containerSize * 2); + targetVolume.incCommittedBytes(-containerSize * 2); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index aff33870dd18..25c0cac10ade 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -23,12 +23,16 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.util.DiskChecker; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; @@ -86,10 +90,18 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); volume = importer.chooseNextVolume(); - volume.incImportContainerCommitBytes(importer.getDefaultContainerSize() * 2); - if (volume.getCurrentUsage().getAvailable() - volume.getCommittedBytes() - - volume.getContainerImportCommittedBytes() <= 0) { - volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + // Increment committed bytes and verify if it doesn't cross the space left. + volume.incCommittedBytes(importer.getDefaultContainerSize() * 2); + // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter + AvailableSpaceFilter filter = new AvailableSpaceFilter(0); + List hddsVolumeList = new ArrayList<>(); + hddsVolumeList.add(volume); + List volumeWithEnoughSpace = hddsVolumeList.stream() + .filter(filter) + .collect(Collectors.toList()); + if (volumeWithEnoughSpace.isEmpty()) { + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + LOG.error("Container {} import was unsuccessful, due to no space left", containerId); volume = null; throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); } @@ -126,7 +138,7 @@ public void onError(Throwable t) { responseObserver.onError(t); } finally { if (volume != null) { - volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); } } } @@ -155,7 +167,7 @@ public void onCompleted() { } } finally { if (volume != null) { - volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); } } } From 34894d86f5355e197bccff97e5e38c5aefc78824 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 15:46:54 +0530 Subject: [PATCH 03/12] Write test case for container import reserve space --- .../TestReplicationSupervisor.java | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index df3462d644b7..f8cc9a389055 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -40,7 +40,10 @@ import com.google.protobuf.Proto2Utils; import jakarta.annotation.Nonnull; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Clock; @@ -51,24 +54,37 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; @@ -76,6 +92,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; @@ -90,6 +107,7 @@ import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; /** @@ -99,6 +117,9 @@ public class TestReplicationSupervisor { private static final long CURRENT_TERM = 1; + @TempDir + private File tempDir; + private final ContainerReplicator noopReplicator = task -> { }; private final ContainerReplicator throwingReplicator = task -> { throw new RuntimeException("testing replication failure"); @@ -327,6 +348,106 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout .contains("Container 1 replication was unsuccessful."); } + @Test + public void testReplicationImportReserveSpace() + throws IOException, InterruptedException, TimeoutException { + OzoneConfiguration conf = new OzoneConfiguration(); + + long containerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(newDirectExecutorService()) + .clock(clock) + .build(); + + long containerId = 1; + // create container + KeyValueContainerData containerData = new KeyValueContainerData(containerId, + ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + ContainerController controllerMock = mock(ContainerController.class); + Semaphore semaphore = new Semaphore(1); + when(controllerMock.importContainer(any(), any(), any())) + .thenAnswer((invocation) -> { + semaphore.acquire(); + return container; + }); + MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + File tarFile = containerTarFile(containerId, containerData); + + SimpleContainerDownloader moc = + mock(SimpleContainerDownloader.class); + when( + moc.getContainerDataFromReplicas(anyLong(), anyList(), + any(Path.class), any())) + .thenReturn(tarFile.toPath()); + + ContainerImporter importer = + new ContainerImporter(conf, set, controllerMock, volumeSet); + + HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); + // Increase committed bytes so that volume has only remaining 3 times container size space + vol1.incCommittedBytes(vol1.getCurrentUsage().getCapacity() - containerSize * 3); + ContainerReplicator replicator = + new DownloadAndImportReplicator(conf, set, importer, moc); + replicatorRef.set(replicator); + + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(DownloadAndImportReplicator.LOG); + + // Acquire semaphore so that container import will pause after reserving space. + semaphore.acquire(); + CompletableFuture.runAsync(() -> { + try { + supervisor.addTask(createTask(containerId)); + } catch (Exception ex) { + } + }); + + // Wait such that first container import reserve space + GenericTestUtils.waitFor(() -> + vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3, + 1000, 50000); + + // Container 2 import will fail as container 1 has reserved space and no space left to import new container + // New container import requires at least (2 * container size) + long containerId2 = 2; + supervisor.addTask(createTask(containerId2)); + GenericTestUtils.waitFor(() -> 1 == supervisor.getReplicationFailureCount(), + 1000, 50000); + assertThat(logCapturer.getOutput()).contains("No volumes have enough space for a new container"); + // Release semaphore so that first container import will pass + semaphore.release(); + GenericTestUtils.waitFor(() -> + 1 == supervisor.getReplicationSuccessCount(), 1000, 50000); + } + + + private File containerTarFile( + long containerId, ContainerData containerData) throws IOException { + File yamlFile = new File(tempDir, "container.yaml"); + ContainerDataYaml.createContainerFile( + ContainerProtos.ContainerType.KeyValueContainer, containerData, + yamlFile); + File tarFile = new File(tempDir, + ContainerUtils.getContainerTarName(containerId)); + try (FileOutputStream output = new FileOutputStream(tarFile)) { + ArchiveOutputStream archive = new TarArchiveOutputStream(output); + TarArchiveEntry entry = archive.createArchiveEntry(yamlFile, + "container.yaml"); + archive.putArchiveEntry(entry); + try (InputStream input = Files.newInputStream(yamlFile.toPath())) { + IOUtils.copy(input, archive); + } + archive.closeArchiveEntry(); + } + return tarFile; + } + @ContainerLayoutTestInfo.ContainerTest public void testTaskBeyondDeadline(ContainerLayoutVersion layout) { this.layoutVersion = layout; From c829ef9249375cf6012ce170356c3e9fcc278e63 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 16:03:01 +0530 Subject: [PATCH 04/12] Write test case for container import reserve space --- .../container/replication/TestReplicationSupervisor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index f8cc9a389055..ddc169e7db26 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -40,9 +40,9 @@ import com.google.protobuf.Proto2Utils; import jakarta.annotation.Nonnull; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -435,7 +435,7 @@ private File containerTarFile( yamlFile); File tarFile = new File(tempDir, ContainerUtils.getContainerTarName(containerId)); - try (FileOutputStream output = new FileOutputStream(tarFile)) { + try (OutputStream output = Files.newOutputStream(tarFile.toPath())) { ArchiveOutputStream archive = new TarArchiveOutputStream(output); TarArchiveEntry entry = archive.createArchiveEntry(yamlFile, "container.yaml"); From f8f650c3bfd8690ca7fbf113e5771f4724e8db7a Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 18:13:12 +0530 Subject: [PATCH 05/12] Fix test case --- .../ozone/container/replication/TestReplicationSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index ddc169e7db26..f897113ad153 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -375,7 +375,7 @@ public void testReplicationImportReserveSpace() semaphore.acquire(); return container; }); - MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null, + MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); File tarFile = containerTarFile(containerId, containerData); From 55c883593f5be6c59c423172e09768666281afd1 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 19:32:43 +0530 Subject: [PATCH 06/12] Fix test case --- .../ozone/container/replication/TestReplicationSupervisor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index f897113ad153..1bdd409bee75 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -352,6 +352,7 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout public void testReplicationImportReserveSpace() throws IOException, InterruptedException, TimeoutException { OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); long containerSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, From 7dca4f814a0475f971f4141a47496856ccda2ec7 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 7 Mar 2025 19:55:36 +0530 Subject: [PATCH 07/12] Add both layoutversion --- .../container/replication/TestReplicationSupervisor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 1bdd409bee75..19268ce1515d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -107,7 +107,6 @@ import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; /** @@ -348,9 +347,10 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout .contains("Container 1 replication was unsuccessful."); } - @Test - public void testReplicationImportReserveSpace() + @ContainerLayoutTestInfo.ContainerTest + public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) throws IOException, InterruptedException, TimeoutException { + this.layoutVersion = layout; OzoneConfiguration conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); From 4be1147f8bca08c3ae4cfd4c89843ba5dd1beabc Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 10 Mar 2025 00:14:22 +0530 Subject: [PATCH 08/12] Remove unused variable --- .../apache/hadoop/ozone/container/common/volume/HddsVolume.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index a0259e355e39..70f25fa138dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -84,8 +84,6 @@ public class HddsVolume extends StorageVolume { private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full - private final AtomicLong containerImportCommittedBytes = new AtomicLong(); - // Mentions the type of volume private final VolumeType type = VolumeType.DATA_VOLUME; // The dedicated DbVolume that the db instance of this HddsVolume resides. From 5d10a2162b1cd61fa97130dd0712729efc0500dd Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 10 Mar 2025 15:03:41 +0530 Subject: [PATCH 09/12] Simplify AvailableSpaceFilter --- .../replication/DownloadAndImportReplicator.java | 11 ++--------- .../replication/SendContainerRequestHandler.java | 12 ++---------- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index b32a31e6b9ff..cdba4c007467 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -87,14 +85,9 @@ public void replicate(ReplicationTask task) { targetVolume.incCommittedBytes(containerSize * 2); // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter AvailableSpaceFilter filter = new AvailableSpaceFilter(0); - List hddsVolumeList = new ArrayList<>(); - hddsVolumeList.add(targetVolume); - List volumeWithEnoughSpace = hddsVolumeList.stream() - .filter(filter) - .collect(Collectors.toList()); - if (volumeWithEnoughSpace.isEmpty()) { + if (!filter.test(targetVolume)) { targetVolume.incCommittedBytes(-containerSize * 2); - LOG.error("Container {} replication was unsuccessful, due to no space left", containerID); + LOG.warn("Container {} replication was unsuccessful, due to no space left", containerID); task.setStatus(Status.FAILED); return; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 25c0cac10ade..e76a44e680de 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -23,9 +23,6 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; @@ -94,14 +91,9 @@ public void onNext(SendContainerRequest req) { volume.incCommittedBytes(importer.getDefaultContainerSize() * 2); // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter AvailableSpaceFilter filter = new AvailableSpaceFilter(0); - List hddsVolumeList = new ArrayList<>(); - hddsVolumeList.add(volume); - List volumeWithEnoughSpace = hddsVolumeList.stream() - .filter(filter) - .collect(Collectors.toList()); - if (volumeWithEnoughSpace.isEmpty()) { + if (!filter.test(volume)) { volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); - LOG.error("Container {} import was unsuccessful, due to no space left", containerId); + LOG.warn("Container {} import was unsuccessful, due to no space left", containerId); volume = null; throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); } From 822abaa19d246cb4c4cf1146525affa9ef437e6d Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 10 Mar 2025 16:11:10 +0530 Subject: [PATCH 10/12] Remove duplicate decrement --- .../ozone/container/replication/DownloadAndImportReplicator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index cdba4c007467..8c44d0d07816 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -86,7 +86,6 @@ public void replicate(ReplicationTask task) { // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter AvailableSpaceFilter filter = new AvailableSpaceFilter(0); if (!filter.test(targetVolume)) { - targetVolume.incCommittedBytes(-containerSize * 2); LOG.warn("Container {} replication was unsuccessful, due to no space left", containerID); task.setStatus(Status.FAILED); return; From dce88976fc5de7e404205ad7d75f9ebfaf196d08 Mon Sep 17 00:00:00 2001 From: ashishk Date: Wed, 12 Mar 2025 16:39:12 +0530 Subject: [PATCH 11/12] Increase used space after container import --- .../replication/ContainerImporter.java | 2 ++ .../TestReplicationSupervisor.java | 21 ++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index e423d35a4c8c..10567475e3a6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -128,6 +128,8 @@ public void importContainer(long containerID, Path tarFilePath, try (InputStream input = Files.newInputStream(tarFilePath)) { Container container = controller.importContainer( containerData, input, packer); + // After container import is successful, increase used space for the volume + targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed()); containerSet.addContainerByOverwriteMissingContainer(container); } } finally { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 19268ce1515d..ec26cb16b9e4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -368,6 +368,9 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) // create container KeyValueContainerData containerData = new KeyValueContainerData(containerId, ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); + HddsVolume vol = mock(HddsVolume.class); + containerData.setVolume(vol); + containerData.incrBytesUsed(100); KeyValueContainer container = new KeyValueContainer(containerData, conf); ContainerController controllerMock = mock(ContainerController.class); Semaphore semaphore = new Semaphore(1); @@ -391,8 +394,14 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) new ContainerImporter(conf, set, controllerMock, volumeSet); HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); + // Initially volume has 0 commit space + assertEquals(0, vol1.getCommittedBytes()); + long usedSpace = vol1.getCurrentUsage().getUsedSpace(); + // Initially volume has 0 used space + assertEquals(0, usedSpace); // Increase committed bytes so that volume has only remaining 3 times container size space - vol1.incCommittedBytes(vol1.getCurrentUsage().getCapacity() - containerSize * 3); + long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerSize * 3; + vol1.incCommittedBytes(initialCommittedBytes); ContainerReplicator replicator = new DownloadAndImportReplicator(conf, set, importer, moc); replicatorRef.set(replicator); @@ -414,6 +423,8 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3, 1000, 50000); + // Volume has reserved space of 2 * containerSize + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerSize); // Container 2 import will fail as container 1 has reserved space and no space left to import new container // New container import requires at least (2 * container size) long containerId2 = 2; @@ -425,6 +436,14 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) semaphore.release(); GenericTestUtils.waitFor(() -> 1 == supervisor.getReplicationSuccessCount(), 1000, 50000); + + usedSpace = vol1.getCurrentUsage().getUsedSpace(); + // After replication, volume used space should be increased by container used bytes + assertEquals(100, usedSpace); + + // Volume committed bytes should become initial committed bytes which was before replication + assertEquals(initialCommittedBytes, vol1.getCommittedBytes()); + } From 29f1cae68e95faa8d1febc42e919bec500464dd1 Mon Sep 17 00:00:00 2001 From: ashishk Date: Wed, 12 Mar 2025 17:00:15 +0530 Subject: [PATCH 12/12] Fix conflict from master --- .../container/replication/TestReplicationSupervisor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 6c8e2cf1f638..fb6064599428 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -76,7 +76,6 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -451,8 +450,7 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) private File containerTarFile( long containerId, ContainerData containerData) throws IOException { File yamlFile = new File(tempDir, "container.yaml"); - ContainerDataYaml.createContainerFile( - ContainerProtos.ContainerType.KeyValueContainer, containerData, + ContainerDataYaml.createContainerFile(containerData, yamlFile); File tarFile = new File(tempDir, ContainerUtils.getContainerTarName(containerId));