From 36469a4195a3fddce345c17680bd813f1909a2e0 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 26 Apr 2025 18:05:01 +0800 Subject: [PATCH 01/14] Make volume space check and reservation as an atomic operation in VolumeChoosingPolicy#chooseVolume --- .../common/volume/CapacityVolumeChoosingPolicy.java | 7 +++++-- .../volume/RoundRobinVolumeChoosingPolicy.java | 3 ++- .../volume/TestCapacityVolumeChoosingPolicy.java | 11 +++++++++++ .../volume/TestRoundRobinVolumeChoosingPolicy.java | 13 +++++++++++++ 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java index 89c686645eac..62da1788e5ee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java @@ -48,7 +48,7 @@ public class CapacityVolumeChoosingPolicy implements VolumeChoosingPolicy { private final Random random = new Random(); @Override - public HddsVolume chooseVolume(List volumes, + public synchronized HddsVolume chooseVolume(List volumes, long maxContainerSize) throws IOException { // No volumes available to choose from @@ -93,7 +93,10 @@ public HddsVolume chooseVolume(List volumes, - firstVolume.getCommittedBytes(); long secondAvailable = secondVolume.getCurrentUsage().getAvailable() - secondVolume.getCommittedBytes(); - return firstAvailable < secondAvailable ? secondVolume : firstVolume; + HddsVolume selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume; + + selectedVolume.incCommittedBytes(maxContainerSize); + return selectedVolume; } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java index 9945a3256b32..b375f22034ac 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java @@ -41,7 +41,7 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { private AtomicInteger nextVolumeIndex = new AtomicInteger(0); @Override - public HddsVolume chooseVolume(List volumes, + public synchronized HddsVolume chooseVolume(List volumes, long maxContainerSize) throws IOException { // No volumes available to choose from @@ -68,6 +68,7 @@ public HddsVolume chooseVolume(List volumes, if (hasEnoughSpace) { logIfSomeVolumesOutOfSpace(filter, LOG); nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex); + volume.incCommittedBytes(maxContainerSize); return volume; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java index d6c97c5f1a36..07ae372a4cd5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java @@ -149,4 +149,15 @@ public void testVolumeChoosingPolicyFactory() VolumeChoosingPolicyFactory.getPolicy(CONF).getClass()); } + @Test + public void testVolumeCommittedSpace() throws Exception { + Map initialCommittedSpace = new HashMap<>(); + volumes.forEach(vol -> + initialCommittedSpace.put(vol, vol.getCommittedBytes())); + + HddsVolume selectedVolume = policy.chooseVolume(volumes, 50); + + assertEquals(initialCommittedSpace.get(selectedVolume) + 50, + selectedVolume.getCommittedBytes()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java index 1c07fe7ab7b3..2406011a3d14 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java @@ -25,7 +25,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; @@ -115,4 +117,15 @@ public void throwsDiskOutOfSpaceIfRequestMoreThanAvailable() { "Most available space: 150 bytes"); } + @Test + public void testVolumeCommittedSpace() throws Exception { + Map initialCommittedSpace = new HashMap<>(); + volumes.forEach(vol -> + initialCommittedSpace.put(vol, vol.getCommittedBytes())); + + HddsVolume selectedVolume = policy.chooseVolume(volumes, 50); + + assertEquals(initialCommittedSpace.get(selectedVolume) + 50, + selectedVolume.getCommittedBytes()); + } } From 1b3cb824aca71fab8c92ca2b098bdac0df6e51ea Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 29 Apr 2025 03:00:04 +0800 Subject: [PATCH 02/14] Explictly reserve/release commit space --- .../container/common/impl/ContainerData.java | 22 ++--- .../container/common/impl/ContainerSet.java | 2 - .../volume/CapacityVolumeChoosingPolicy.java | 12 ++- .../container/keyvalue/KeyValueContainer.java | 11 +++ .../container/ozoneimpl/ContainerReader.java | 84 ++++++++++--------- .../replication/ContainerImporter.java | 7 +- .../DownloadAndImportReplicator.java | 13 +-- .../SendContainerRequestHandler.java | 17 +--- .../common/impl/TestContainerPersistence.java | 5 +- .../TestDownloadAndImportReplicator.java | 25 ++++++ .../TestReplicationSupervisor.java | 31 +++---- 11 files changed, 119 insertions(+), 110 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index d431b494d783..ba44668e1109 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.STATE; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import jakarta.annotation.Nullable; @@ -214,21 +213,14 @@ public synchronized void setState(ContainerDataProto.State state) { (state != oldState)) { releaseCommitSpace(); } - - /** - * commit space when container transitions (back) to Open. - * when? perhaps closing a container threw an exception - */ - if ((state == ContainerDataProto.State.OPEN) && - (state != oldState)) { - Preconditions.checkState(getMaxSize() > 0); - commitSpace(); - } } - @VisibleForTesting - void setCommittedSpace(boolean committedSpace) { - this.committedSpace = committedSpace; + public void setCommittedSpace(boolean committed) { + if (committed) { + //we don't expect duplicate space commit + Preconditions.checkState(!committedSpace); + } + committedSpace = committed; } /** @@ -356,7 +348,7 @@ public synchronized void closeContainer() { setState(ContainerDataProto.State.CLOSED); } - private void releaseCommitSpace() { + public void releaseCommitSpace() { long unused = getMaxSize() - getBytesUsed(); // only if container size < max size diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 9b5c89e1f73e..0f5c19fd3364 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -154,8 +154,6 @@ private boolean addContainer(Container container, boolean overwrite) throws throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION); } missingContainerSet.remove(containerId); - // wish we could have done this from ContainerData.setState - container.getContainerData().commitSpace(); if (container.getContainerData().getState() == RECOVERING) { recoveringContainerMap.put( clock.millis() + recoveringTimeout, containerId); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java index 62da1788e5ee..64477c21969e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java @@ -69,9 +69,8 @@ public synchronized HddsVolume chooseVolume(List volumes, } int count = volumesWithEnoughSpace.size(); - if (count == 1) { - return volumesWithEnoughSpace.get(0); - } else { + HddsVolume selectedVolume = volumesWithEnoughSpace.get(0); + if (count > 1) { // Even if we don't have too many volumes in volumesWithEnoughSpace, this // algorithm will still help us choose the volume with larger // available space than other volumes. @@ -93,10 +92,9 @@ public synchronized HddsVolume chooseVolume(List volumes, - firstVolume.getCommittedBytes(); long secondAvailable = secondVolume.getCurrentUsage().getAvailable() - secondVolume.getCommittedBytes(); - HddsVolume selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume; - - selectedVolume.incCommittedBytes(maxContainerSize); - return selectedVolume; + selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume; } + selectedVolume.incCommittedBytes(maxContainerSize); + return selectedVolume; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 09cadd5d13fc..75be58471a48 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -168,6 +168,7 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy CONTAINER_INTERNAL_ERROR); } + Boolean exceptionThrown = false; try { String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); // Set volume before getContainerDBFile(), because we may need the @@ -207,18 +208,24 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy File containerFile = getContainerFile(); createContainerFile(containerFile); + // commit space has been reserved by volumeChoosingPolicy + containerData.setCommittedSpace(true); + return; } catch (StorageContainerException ex) { + exceptionThrown = true; if (containerMetaDataPath != null && containerMetaDataPath.getParentFile().exists()) { FileUtil.fullyDelete(containerMetaDataPath.getParentFile()); } throw ex; } catch (FileAlreadyExistsException ex) { + exceptionThrown = true; throw new StorageContainerException("Container creation failed " + "because ContainerFile already exists", ex, CONTAINER_ALREADY_EXISTS); } catch (IOException ex) { + exceptionThrown = true; // This is a general catch all - no space left of device, which should // not happen as the volume Choosing policy should filter out full // disks, but it may still be possible if the disk quickly fills, @@ -236,6 +243,10 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy "Failed to create " + containerData + " on all volumes: " + volumeSet.getVolumesList(), ex, CONTAINER_INTERNAL_ERROR); } + } finally { + if (exceptionThrown) { + containerData.releaseCommitSpace(); + } } } } finally { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 90bbb3186ad4..18148321f6ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING; import com.google.common.base.Preconditions; @@ -202,50 +203,55 @@ public void verifyAndFixupContainerData(ContainerData containerData) throws IOException { switch (containerData.getContainerType()) { case KeyValueContainer: - if (containerData instanceof KeyValueContainerData) { - KeyValueContainerData kvContainerData = (KeyValueContainerData) - containerData; - containerData.setVolume(hddsVolume); - KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); - KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData, - config); - if (kvContainer.getContainerState() == RECOVERING) { - if (shouldDelete) { - // delete Ratis replicated RECOVERING containers - if (kvContainer.getContainerData().getReplicaIndex() == 0) { - cleanupContainer(hddsVolume, kvContainer); - } else { - kvContainer.markContainerUnhealthy(); - LOG.info("Stale recovering container {} marked UNHEALTHY", - kvContainerData.getContainerID()); - containerSet.addContainer(kvContainer); - } - } - return; - } - if (kvContainer.getContainerState() == DELETED) { - if (shouldDelete) { - cleanupContainer(hddsVolume, kvContainer); - } - return; - } - try { - containerSet.addContainer(kvContainer); - } catch (StorageContainerException e) { - if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { - throw e; - } - if (shouldDelete) { - resolveDuplicate((KeyValueContainer) containerSet.getContainer( - kvContainer.getContainerData().getContainerID()), kvContainer); - } - } - } else { + if (!(containerData instanceof KeyValueContainerData)) { throw new StorageContainerException("Container File is corrupted. " + "ContainerType is KeyValueContainer but cast to " + "KeyValueContainerData failed. ", ContainerProtos.Result.CONTAINER_METADATA_ERROR); } + + KeyValueContainerData kvContainerData = (KeyValueContainerData) + containerData; + containerData.setVolume(hddsVolume); + KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); + KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData, + config); + if (kvContainer.getContainerState() == RECOVERING) { + if (shouldDelete) { + // delete Ratis replicated RECOVERING containers + if (kvContainer.getContainerData().getReplicaIndex() == 0) { + cleanupContainer(hddsVolume, kvContainer); + } else { + kvContainer.markContainerUnhealthy(); + LOG.info("Stale recovering container {} marked UNHEALTHY", + kvContainerData.getContainerID()); + containerSet.addContainer(kvContainer); + } + } + return; + } else if (kvContainer.getContainerState() == DELETED) { + if (shouldDelete) { + cleanupContainer(hddsVolume, kvContainer); + } + return; + } + + try { + if (kvContainer.getContainerState() == OPEN) { + // only open container would get new data written in + containerData.commitSpace(); + } + containerSet.addContainer(kvContainer); + } catch (StorageContainerException e) { + if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { + throw e; + } + if (shouldDelete) { + resolveDuplicate((KeyValueContainer) containerSet.getContainer( + kvContainer.getContainerData().getContainerID()), kvContainer); + } + containerData.releaseCommitSpace(); + } break; default: throw new StorageContainerException("Unrecognized ContainerType " + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index f69516f94e17..100f54e2589c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) { } public void importContainer(long containerID, Path tarFilePath, - HddsVolume hddsVolume, CopyContainerCompression compression) + HddsVolume targetVolume, CopyContainerCompression compression) throws IOException { if (!importContainerProgress.add(containerID)) { deleteFileQuietely(tarFilePath); @@ -106,11 +106,6 @@ public void importContainer(long containerID, Path tarFilePath, ContainerProtos.Result.CONTAINER_EXISTS); } - HddsVolume targetVolume = hddsVolume; - if (targetVolume == null) { - targetVolume = chooseNextVolume(); - } - KeyValueContainerData containerData; TarContainerPacker packer = getPacker(compression); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 9a943c633386..4f7ce439c3c6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.slf4j.Logger; @@ -81,15 +81,6 @@ public void replicate(ReplicationTask task) { try { targetVolume = containerImporter.chooseNextVolume(); - // Increment committed bytes and verify if it doesn't cross the space left. - targetVolume.incCommittedBytes(containerSize * 2); - StorageLocationReport volumeReport = targetVolume.getReport(); - // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter - if (volumeReport.getUsableSpace() <= 0) { - LOG.warn("Container {} replication was unsuccessful, no space left on volume {}", containerID, volumeReport); - task.setStatus(Status.FAILED); - return; - } // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = @@ -114,7 +105,7 @@ public void replicate(ReplicationTask task) { task.setStatus(Status.FAILED); } finally { if (targetVolume != null) { - targetVolume.incCommittedBytes(-containerSize * 2); + targetVolume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(containerSize)); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 5224498e7274..b88aa5f09733 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -27,11 +27,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.util.DiskChecker; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -87,16 +86,6 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); volume = importer.chooseNextVolume(); - // Increment committed bytes and verify if it doesn't cross the space left. - volume.incCommittedBytes(importer.getDefaultContainerSize() * 2); - StorageLocationReport volumeReport = volume.getReport(); - // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter - if (volumeReport.getUsableSpace() <= 0) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); - LOG.warn("Container {} import was unsuccessful, no space left on volume {}", containerId, volumeReport); - volume = null; - throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); - } Path dir = ContainerImporter.getUntarDirectory(volume); Files.createDirectories(dir); @@ -130,7 +119,7 @@ public void onError(Throwable t) { responseObserver.onError(t); } finally { if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize())); } } } @@ -159,7 +148,7 @@ public void onCompleted() { } } finally { if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize())); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index d052f89f2417..cfa4b1f23fb9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -204,10 +204,11 @@ private KeyValueContainer addContainer(ContainerSet cSet, long cID) data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); KeyValueContainer container = new KeyValueContainer(data, conf); + commitBytesBefore = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()).get(0).getCommittedBytes(); + container.create(volumeSet, volumeChoosingPolicy, SCM_ID); - commitBytesBefore = container.getContainerData() - .getVolume().getCommittedBytes(); cSet.addContainer(container); + commitBytesAfter = container.getContainerData() .getVolume().getCommittedBytes(); commitIncrement = commitBytesAfter - commitBytesBefore; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java new file mode 100644 index 000000000000..cb35713e0f7a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +/** + * Test for DownloadAndImportReplicator. + */ +public class TestDownloadAndImportReplicator { + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 9acb73486a06..1e69eac2ea9e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -355,11 +355,12 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout @ContainerLayoutTestInfo.ContainerTest public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) throws IOException, InterruptedException, TimeoutException { + final long containerUsedSize = 100; this.layoutVersion = layout; OzoneConfiguration conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); - long containerSize = (long) conf.getStorageSize( + long containerMaxSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); @@ -369,13 +370,16 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) .clock(clock) .build(); + MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + long containerId = 1; // create container KeyValueContainerData containerData = new KeyValueContainerData(containerId, - ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); - HddsVolume vol = mock(HddsVolume.class); - containerData.setVolume(vol); - containerData.incrBytesUsed(100); + ContainerLayoutVersion.FILE_PER_BLOCK, containerMaxSize, "test", "test"); + HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); + containerData.setVolume(vol1); + containerData.incrBytesUsed(containerUsedSize); KeyValueContainer container = new KeyValueContainer(containerData, conf); ContainerController controllerMock = mock(ContainerController.class); Semaphore semaphore = new Semaphore(1); @@ -384,8 +388,7 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) semaphore.acquire(); return container; }); - MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, - StorageVolume.VolumeType.DATA_VOLUME, null); + File tarFile = containerTarFile(containerId, containerData); SimpleContainerDownloader moc = @@ -398,14 +401,13 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) ContainerImporter importer = new ContainerImporter(conf, set, controllerMock, volumeSet, volumeChoosingPolicy); - HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); // Initially volume has 0 commit space assertEquals(0, vol1.getCommittedBytes()); long usedSpace = vol1.getCurrentUsage().getUsedSpace(); // Initially volume has 0 used space assertEquals(0, usedSpace); // Increase committed bytes so that volume has only remaining 3 times container size space - long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerSize * 3; + long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerMaxSize * 3; vol1.incCommittedBytes(initialCommittedBytes); ContainerReplicator replicator = new DownloadAndImportReplicator(conf, set, importer, moc); @@ -424,11 +426,11 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) // Wait such that first container import reserve space GenericTestUtils.waitFor(() -> - vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3, + vol1.getCommittedBytes() > initialCommittedBytes, 1000, 50000); // Volume has reserved space of 2 * containerSize - assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerSize); + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); // Container 2 import will fail as container 1 has reserved space and no space left to import new container // New container import requires at least (2 * container size) long containerId2 = 2; @@ -443,10 +445,11 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) usedSpace = vol1.getCurrentUsage().getUsedSpace(); // After replication, volume used space should be increased by container used bytes - assertEquals(100, usedSpace); + assertEquals(containerUsedSize, usedSpace); - // Volume committed bytes should become initial committed bytes which was before replication - assertEquals(initialCommittedBytes, vol1.getCommittedBytes()); + // Volume committed bytes used for replication has been released, no need to reserve space for imported container + // only closed container gets replicated, so no new data will be written it + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes); } From 55b9ca95f457eff4ec206bba8c54db81cd6a5ab5 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 29 Apr 2025 12:00:54 +0800 Subject: [PATCH 03/14] Add coverge of commit space reserve/release cases for container importing --- .../TestDownloadAndImportReplicator.java | 100 +++++++++++++- .../TestSendContainerRequestHandler.java | 124 ++++++++++++++++-- 2 files changed, 211 insertions(+), 13 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java index cb35713e0f7a..5993e43e6617 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java @@ -17,9 +17,107 @@ package org.apache.hadoop.ozone.container.replication; +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + /** * Test for DownloadAndImportReplicator. */ +@Timeout(300) public class TestDownloadAndImportReplicator { - + + @TempDir + private File tempDir; + + private OzoneConfiguration conf; + private VolumeChoosingPolicy volumeChoosingPolicy; + private ContainerSet containerSet; + private MutableVolumeSet volumeSet; + private ContainerImporter importer; + private SimpleContainerDownloader downloader; + private DownloadAndImportReplicator replicator; + private long containerMaxSize; + + @BeforeEach + void setup() throws IOException { + conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); + volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); + containerSet = newContainerSet(0); + volumeSet = new MutableVolumeSet("test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + importer = new ContainerImporter(conf, containerSet, + mock(ContainerController.class), volumeSet, volumeChoosingPolicy); + downloader = mock(SimpleContainerDownloader.class); + replicator = new DownloadAndImportReplicator(conf, containerSet, importer, + downloader); + containerMaxSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + } + + @Test + public void testCommitSpaceReleasedOnReplicationFailure() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Mock downloader to throw exception + Semaphore semaphore = new Semaphore(1); + when(downloader.getContainerDataFromReplicas(anyLong(), any(), any(), any())) + .thenAnswer(invocation -> { + semaphore.acquire(); + throw new IOException("Download failed"); + }); + + ReplicationTask task = new ReplicationTask(containerId, + Collections.singletonList(mock(DatanodeDetails.class)), replicator); + + // Acquire semaphore so that container import will pause before downloading. + semaphore.acquire(); + CompletableFuture.runAsync(() -> { + assertThrows(IOException.class, () -> replicator.replicate(task)); + }); + + // Wait such that first container import reserve space + GenericTestUtils.waitFor(() -> + volume.getCommittedBytes() > initialCommittedBytes, + 1000, 50000); + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + semaphore.release(); + + GenericTestUtils.waitFor(() -> + volume.getCommittedBytes() == initialCommittedBytes, + 1000, 50000); + + // Verify commit space is released + assertEquals(initialCommittedBytes, volume.getCommittedBytes()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java index 0d15e265ad91..0a5c41cbf110 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java @@ -21,18 +21,24 @@ import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import java.io.File; +import java.io.IOException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; @@ -43,12 +49,14 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; /** * Test for {@link SendContainerRequestHandler}. */ -class TestSendContainerRequestHandler { +@Timeout(300) +public class TestSendContainerRequestHandler { @TempDir private File tempDir; @@ -57,38 +65,48 @@ class TestSendContainerRequestHandler { private VolumeChoosingPolicy volumeChoosingPolicy; + private ContainerSet containerSet; + private MutableVolumeSet volumeSet; + private ContainerImporter importer; + private StreamObserver responseObserver; + private SendContainerRequestHandler sendContainerRequestHandler; + private long containerMaxSize; + @BeforeEach - void setup() { + void setup() throws IOException { conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); + containerSet = newContainerSet(0); + volumeSet = new MutableVolumeSet("test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + importer = new ContainerImporter(conf, containerSet, + mock(ContainerController.class), volumeSet, volumeChoosingPolicy); + importer = spy(importer); + responseObserver = mock(StreamObserver.class); + sendContainerRequestHandler = new SendContainerRequestHandler(importer, responseObserver, null); + containerMaxSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Test void testReceiveDataForExistingContainer() throws Exception { long containerId = 1; // create containerImporter - ContainerSet containerSet = newContainerSet(0); - MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null, - StorageVolume.VolumeType.DATA_VOLUME, null); - ContainerImporter containerImporter = new ContainerImporter(conf, - newContainerSet(0), mock(ContainerController.class), volumeSet, volumeChoosingPolicy); KeyValueContainerData containerData = new KeyValueContainerData(containerId, ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); // add container to container set KeyValueContainer container = new KeyValueContainer(containerData, conf); containerSet.addContainer(container); - StreamObserver observer = mock(StreamObserver.class); doAnswer(invocation -> { Object arg = invocation.getArgument(0); assertInstanceOf(StorageContainerException.class, arg); assertEquals(ContainerProtos.Result.CONTAINER_EXISTS, ((StorageContainerException) arg).getResult()); return null; - }).when(observer).onError(any()); - SendContainerRequestHandler sendContainerRequestHandler - = new SendContainerRequestHandler(containerImporter, observer, null); + }).when(responseObserver).onError(any()); ByteString data = ByteString.copyFromUtf8("test"); ContainerProtos.SendContainerRequest request = ContainerProtos.SendContainerRequest.newBuilder() @@ -99,4 +117,86 @@ void testReceiveDataForExistingContainer() throws Exception { .build(); sendContainerRequestHandler.onNext(request); } + + @Test + public void testSpaceReservedAndReleasedWhenRequestCompleted() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = ContainerProtos.SendContainerRequest.newBuilder() + .setContainerID(containerId) + .setData(ByteString.EMPTY) + .setOffset(0) + .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) + .build(); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + // complete the request + sendContainerRequestHandler.onCompleted(); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + @Test + public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + // Send a failed response with wrong offset + request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + sendContainerRequestHandler.onNext(request); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + @Test + public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + doThrow(new IOException("Failed")).when(importer).importContainer(anyLong(), any(), any(), any()); + + sendContainerRequestHandler.onCompleted(); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + private ContainerProtos.SendContainerRequest createRequest(long containerId, ByteString data, int offset) { + return ContainerProtos.SendContainerRequest.newBuilder() + .setContainerID(containerId) + .setData(data) + .setOffset(offset) + .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) + .build(); + } } From 8e59cf63ad0ce05651436b07d2e488c6eb495626 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 29 Apr 2025 19:58:11 +0800 Subject: [PATCH 04/14] Add tests for container creation commit space reservation and exception handling --- .../container/common/impl/ContainerData.java | 4 ++ .../keyvalue/TestKeyValueContainer.java | 39 +++++++++++++++++++ .../TestSendContainerRequestHandler.java | 2 - 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index ba44668e1109..6449c5f1f2a8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -215,6 +215,10 @@ public synchronized void setState(ContainerDataProto.State state) { } } + public boolean isCommittedSpace() { + return committedSpace; + } + public void setCommittedSpace(boolean committed) { if (committed) { //we don't expect duplicate space commit diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index c700b235faf9..8aa20908bb76 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -34,10 +34,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -1097,4 +1103,37 @@ private void testMixedSchemaImport(String dir, assertEquals(pendingDeleteBlockCount, importedContainer.getContainerData().getNumPendingDeletionBlocks()); } + + @ContainerTestVersionInfo.ContainerTest + public void testContainerCreationCommitSpaceReserve( + ContainerTestVersionInfo versionInfo) throws Exception { + init(versionInfo); + keyValueContainerData = spy(keyValueContainerData); + keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF); + keyValueContainer = spy(keyValueContainer); + + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + // verify that + verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this would reserve commit space + verify(keyValueContainerData, times(0)).releaseCommitSpace(); + assertTrue(keyValueContainerData.isCommittedSpace()); + } + + @ContainerTestVersionInfo.ContainerTest + public void testContainerCreationCommitSpaceReserveWithException( + ContainerTestVersionInfo versionInfo) throws Exception { + init(versionInfo); + keyValueContainerData = spy(keyValueContainerData); + keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF); + keyValueContainer = spy(keyValueContainer); + + doThrow(new IOException("test")).when(keyValueContainer).createContainerMetaData(any(File.class), any(File.class), + any(File.class), anyString(), any(ConfigurationSource.class)); + assertThrows(IOException.class, () -> keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId)); + + verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this would reserve commit space + verify(keyValueContainerData).releaseCommitSpace(); + assertFalse(keyValueContainerData.isCommittedSpace()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java index 0a5c41cbf110..4ec7fff6ced3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java @@ -49,13 +49,11 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; /** * Test for {@link SendContainerRequestHandler}. */ -@Timeout(300) public class TestSendContainerRequestHandler { @TempDir From 85ee3bc8c1856eb698853db1d898a563e91df818 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 29 Apr 2025 20:13:51 +0800 Subject: [PATCH 05/14] original behaviour didnt consider container state when committing space --- .../hadoop/ozone/container/ozoneimpl/ContainerReader.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 18148321f6ef..d60a97dc654b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING; import com.google.common.base.Preconditions; @@ -237,10 +236,7 @@ public void verifyAndFixupContainerData(ContainerData containerData) } try { - if (kvContainer.getContainerState() == OPEN) { - // only open container would get new data written in - containerData.commitSpace(); - } + containerData.commitSpace(); containerSet.addContainer(kvContainer); } catch (StorageContainerException e) { if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { From f50134f6835d2afbcf2434dd0b1fff9269847725 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 7 May 2025 13:54:07 +0800 Subject: [PATCH 06/14] set commit right after the volume is choosen and handle release case at handler --- .../container/keyvalue/KeyValueContainer.java | 14 ++------------ .../ozone/container/keyvalue/KeyValueHandler.java | 5 +++++ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 75be58471a48..d58691dfb63e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -168,12 +168,13 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy CONTAINER_INTERNAL_ERROR); } - Boolean exceptionThrown = false; try { String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); // Set volume before getContainerDBFile(), because we may need the // volume to deduce the db file. containerData.setVolume(containerVolume); + // commit space has been reserved by volumeChoosingPolicy + containerData.setCommittedSpace(true); long containerID = containerData.getContainerID(); String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( @@ -207,25 +208,18 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy // Create .container file File containerFile = getContainerFile(); createContainerFile(containerFile); - - // commit space has been reserved by volumeChoosingPolicy - containerData.setCommittedSpace(true); - return; } catch (StorageContainerException ex) { - exceptionThrown = true; if (containerMetaDataPath != null && containerMetaDataPath.getParentFile().exists()) { FileUtil.fullyDelete(containerMetaDataPath.getParentFile()); } throw ex; } catch (FileAlreadyExistsException ex) { - exceptionThrown = true; throw new StorageContainerException("Container creation failed " + "because ContainerFile already exists", ex, CONTAINER_ALREADY_EXISTS); } catch (IOException ex) { - exceptionThrown = true; // This is a general catch all - no space left of device, which should // not happen as the volume Choosing policy should filter out full // disks, but it may still be possible if the disk quickly fills, @@ -243,10 +237,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy "Failed to create " + containerData + " on all volumes: " + volumeSet.getVolumesList(), ex, CONTAINER_INTERNAL_ERROR); } - } finally { - if (exceptionThrown) { - containerData.releaseCommitSpace(); - } } } } finally { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 8e1080dc48d1..7083e1b7caa9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -417,6 +417,7 @@ ContainerCommandResponseProto handleCreateContainer( newContainerData, conf); boolean created = false; + boolean exceptionThrown = false; Lock containerIdLock = containerCreationLocks.get(containerID); containerIdLock.lock(); try { @@ -434,8 +435,12 @@ ContainerCommandResponseProto handleCreateContainer( LOG.debug("Container already exists. container Id {}", containerID); } } catch (StorageContainerException ex) { + exceptionThrown = true; return ContainerUtils.logAndReturnError(LOG, ex, request); } finally { + if (exceptionThrown) { + newContainerData.releaseCommitSpace(); + } containerIdLock.unlock(); } From 35f47d8f328409b2d81fe425a320e166efc8e4ce Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 7 May 2025 13:55:55 +0800 Subject: [PATCH 07/14] container reader commit space after adding container and some refactor --- .../hadoop/ozone/container/ozoneimpl/ContainerReader.java | 5 +++-- .../ozone/container/replication/ContainerImporter.java | 6 +++--- .../container/replication/SendContainerRequestHandler.java | 5 ++--- .../replication/TestSendContainerRequestHandler.java | 6 ++++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index d60a97dc654b..fccb2f54e5c2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -236,8 +236,10 @@ public void verifyAndFixupContainerData(ContainerData containerData) } try { - containerData.commitSpace(); containerSet.addContainer(kvContainer); + + // this should be the last step of this block + containerData.commitSpace(); } catch (StorageContainerException e) { if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { throw e; @@ -246,7 +248,6 @@ public void verifyAndFixupContainerData(ContainerData containerData) resolveDuplicate((KeyValueContainer) containerSet.getContainer( kvContainer.getContainerData().getContainerID()), kvContainer); } - containerData.releaseCommitSpace(); } break; default: diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 100f54e2589c..ff7b1d3b7328 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -143,7 +143,7 @@ HddsVolume chooseNextVolume() throws IOException { // Choose volume that can hold both container in tmp and dest directory return volumeChoosingPolicy.chooseVolume( StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), - HddsServerUtil.requiredReplicationSpace(containerSize)); + getDefaultReplicationSpace()); } public static Path getUntarDirectory(HddsVolume hddsVolume) @@ -166,7 +166,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) { return new TarContainerPacker(compression); } - public long getDefaultContainerSize() { - return containerSize; + public long getDefaultReplicationSpace() { + return HddsServerUtil.requiredReplicationSpace(containerSize); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index b88aa5f09733..9cb07a21c5dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -119,7 +118,7 @@ public void onError(Throwable t) { responseObserver.onError(t); } finally { if (volume != null) { - volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize())); + volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); } } } @@ -148,7 +147,7 @@ public void onCompleted() { } } finally { if (volume != null) { - volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize())); + volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java index 4ec7fff6ced3..441bc7890b65 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -158,8 +159,9 @@ public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception { // Verify commit space is reserved assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); - // Send a failed response with wrong offset - request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + // mock the importer is not allowed to import this container + when(importer.isAllowedContainerImport(containerId)).thenReturn(false); + sendContainerRequestHandler.onNext(request); // Verify commit space is released From 57f254d13c61714d34d9ec1776e9589075369361 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 7 May 2025 13:56:13 +0800 Subject: [PATCH 08/14] Add test in TestContainerReader --- .../ozoneimpl/TestContainerReader.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java index 6a48765c1a91..c42c7ff2814f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java @@ -217,8 +217,18 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) throws Exception { setLayoutAndSchemaVersion(versionInfo); setup(versionInfo); + + ContainerReader containerReader = new ContainerReader(volumeSet, + hddsVolume, containerSet, conf, true); + Thread thread = new Thread(containerReader); + thread.start(); + thread.join(); + long originalCommittedBytes = hddsVolume.getCommittedBytes(); + ContainerCache.getInstance(conf).shutdownCache(); + + long recoveringContainerId = 10; KeyValueContainerData recoveringContainerData = new KeyValueContainerData( - 10, layout, (long) StorageUnit.GB.toBytes(5), + recoveringContainerId, layout, (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), datanodeId.toString()); //create a container with recovering state recoveringContainerData.setState(RECOVERING); @@ -229,21 +239,23 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) recoveringKeyValueContainer.create( volumeSet, volumeChoosingPolicy, clusterId); - ContainerReader containerReader = new ContainerReader(volumeSet, - hddsVolume, containerSet, conf, true); - - Thread thread = new Thread(containerReader); + thread = new Thread(containerReader); thread.start(); thread.join(); + // no change, only open containers have committed space + assertEquals(originalCommittedBytes, hddsVolume.getCommittedBytes()); + // Ratis replicated recovering containers are deleted upon datanode startup if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0) { assertNull(containerSet.getContainer(recoveringContainerData.getContainerID())); assertEquals(2, containerSet.containerCount()); } else { //recovering container should be marked unhealthy, so the count should be 3 - assertEquals(UNHEALTHY, containerSet.getContainer( - recoveringContainerData.getContainerID()).getContainerState()); + Container c = containerSet.getContainer( + recoveringContainerData.getContainerID()); + assertEquals(UNHEALTHY, c.getContainerData().getState()); + assertFalse(c.getContainerData().isCommittedSpace()); assertEquals(3, containerSet.containerCount()); } @@ -262,6 +274,8 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) assertEquals(i, keyValueContainerData.getNumPendingDeletionBlocks()); + + assertTrue(keyValueContainerData.isCommittedSpace()); } } @@ -313,6 +327,14 @@ public void testContainerReaderWithLoadException( hddsVolume1, containerSet1, conf, true); containerReader.readVolume(hddsVolume1.getHddsRootDir()); assertEquals(containerCount - 1, containerSet1.containerCount()); + for (Container c : containerSet1.getContainerMap().values()) { + if (c.getContainerData().getContainerID() == 0) { + assertFalse(c.getContainerData().isCommittedSpace()); + } else { + assertTrue(c.getContainerData().isCommittedSpace()); + } + } + assertEquals(hddsVolume1.getCommittedBytes(), (containerCount - 1) * StorageUnit.GB.toBytes(5)); } @ContainerTestVersionInfo.ContainerTest @@ -361,6 +383,7 @@ public void testContainerReaderWithInvalidDbPath( hddsVolume1, containerSet1, conf, true); containerReader.readVolume(hddsVolume1.getHddsRootDir()); assertEquals(0, containerSet1.containerCount()); + assertEquals(0, hddsVolume1.getCommittedBytes()); assertThat(dnLogs.getOutput()).contains("Container DB file is missing"); } From 47cb71da02db0328e16e751f4f7675b317686dc8 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 7 May 2025 20:56:41 +0800 Subject: [PATCH 09/14] Addressed comments --- .../ozone/container/common/impl/ContainerData.java | 4 ---- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 6 +----- .../ozone/container/ozoneimpl/ContainerReader.java | 1 - .../replication/DownloadAndImportReplicator.java | 9 +-------- .../ozone/container/ozoneimpl/TestContainerReader.java | 6 ++---- 5 files changed, 4 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 6449c5f1f2a8..7bb59247ca53 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -220,10 +220,6 @@ public boolean isCommittedSpace() { } public void setCommittedSpace(boolean committed) { - if (committed) { - //we don't expect duplicate space commit - Preconditions.checkState(!committedSpace); - } committedSpace = committed; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 7083e1b7caa9..e39cb19daedf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -417,7 +417,6 @@ ContainerCommandResponseProto handleCreateContainer( newContainerData, conf); boolean created = false; - boolean exceptionThrown = false; Lock containerIdLock = containerCreationLocks.get(containerID); containerIdLock.lock(); try { @@ -435,12 +434,9 @@ ContainerCommandResponseProto handleCreateContainer( LOG.debug("Container already exists. container Id {}", containerID); } } catch (StorageContainerException ex) { - exceptionThrown = true; + newContainerData.releaseCommitSpace(); return ContainerUtils.logAndReturnError(LOG, ex, request); } finally { - if (exceptionThrown) { - newContainerData.releaseCommitSpace(); - } containerIdLock.unlock(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index fccb2f54e5c2..abec716f8749 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -237,7 +237,6 @@ public void verifyAndFixupContainerData(ContainerData containerData) try { containerSet.addContainer(kvContainer); - // this should be the last step of this block containerData.commitSpace(); } catch (StorageContainerException e) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 4f7ce439c3c6..240ba9473d3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -22,10 +22,7 @@ import java.nio.file.Path; import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; @@ -47,7 +44,6 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerDownloader downloader; private final ContainerImporter containerImporter; private final ContainerSet containerSet; - private final long containerSize; public DownloadAndImportReplicator( ConfigurationSource conf, ContainerSet containerSet, @@ -57,9 +53,6 @@ public DownloadAndImportReplicator( this.containerSet = containerSet; this.downloader = downloader; this.containerImporter = containerImporter; - containerSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Override @@ -105,7 +98,7 @@ public void replicate(ReplicationTask task) { task.setStatus(Status.FAILED); } finally { if (targetVolume != null) { - targetVolume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(containerSize)); + targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace()); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java index c42c7ff2814f..ec5c6743e729 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java @@ -252,10 +252,8 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) assertEquals(2, containerSet.containerCount()); } else { //recovering container should be marked unhealthy, so the count should be 3 - Container c = containerSet.getContainer( - recoveringContainerData.getContainerID()); - assertEquals(UNHEALTHY, c.getContainerData().getState()); - assertFalse(c.getContainerData().isCommittedSpace()); + assertEquals(UNHEALTHY, containerSet.getContainer( + recoveringContainerData.getContainerID()).getContainerState()); assertEquals(3, containerSet.containerCount()); } From 45c00452d044ce0cf170b6b217f0d47bdcdc42e6 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 7 May 2025 23:01:34 +0800 Subject: [PATCH 10/14] Move release commit byte check logic to TestKeyValueHandler --- .../keyvalue/TestKeyValueContainer.java | 22 ----- .../keyvalue/TestKeyValueHandler.java | 80 ++++++++++++++++++- 2 files changed, 79 insertions(+), 23 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 8aa20908bb76..51a949e496fc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -34,15 +34,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -1116,24 +1112,6 @@ public void testContainerCreationCommitSpaceReserve( // verify that verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this would reserve commit space - verify(keyValueContainerData, times(0)).releaseCommitSpace(); assertTrue(keyValueContainerData.isCommittedSpace()); } - - @ContainerTestVersionInfo.ContainerTest - public void testContainerCreationCommitSpaceReserveWithException( - ContainerTestVersionInfo versionInfo) throws Exception { - init(versionInfo); - keyValueContainerData = spy(keyValueContainerData); - keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF); - keyValueContainer = spy(keyValueContainer); - - doThrow(new IOException("test")).when(keyValueContainer).createContainerMetaData(any(File.class), any(File.class), - any(File.class), anyString(), any(ConfigurationSource.class)); - assertThrows(IOException.class, () -> keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId)); - - verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this would reserve commit space - verify(keyValueContainerData).releaseCommitSpace(); - assertFalse(keyValueContainerData.isCommittedSpace()); - } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 256ca20e9386..7927864861b7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -25,11 +25,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,6 +45,8 @@ import java.util.HashMap; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; @@ -53,6 +57,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; @@ -70,17 +75,22 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.util.Time; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for {@link KeyValueHandler}. */ public class TestKeyValueHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestKeyValueHandler.class); + @TempDir private Path tempDir; @@ -91,9 +101,11 @@ public class TestKeyValueHandler { private HddsDispatcher dispatcher; private KeyValueHandler handler; + private long maxContainerSize; @BeforeEach public void setup() throws StorageContainerException { + OzoneConfiguration conf = new OzoneConfiguration(); // Create mock HddsDispatcher and KeyValueHandler. handler = mock(KeyValueHandler.class); @@ -109,6 +121,10 @@ public void setup() throws StorageContainerException { mock(ContainerMetrics.class), mock(TokenVerifier.class) ); + + maxContainerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } /** @@ -337,6 +353,68 @@ public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion) "Close container should return Invalid container error"); } + @Test + public void testCreateContainerWithFailure() throws Exception { + final String testDir = tempDir.toString(); + final long containerID = 1L; + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + final ConfigurationSource conf = new OzoneConfiguration(); + final ContainerSet containerSet = spy(newContainerSet()); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + List hddsVolumeList = StorageVolumeUtil + .getHddsVolumesList(volumeSet.getVolumesList()); + + assertEquals(1, hddsVolumeList.size()); + + final ContainerMetrics metrics = ContainerMetrics.create(conf); + + final AtomicInteger icrReceived = new AtomicInteger(0); + + final KeyValueHandler kvHandler = new KeyValueHandler(conf, + datanodeId, containerSet, volumeSet, metrics, + c -> icrReceived.incrementAndGet()); + kvHandler.setClusterID(clusterId); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(datanodeId, containerID); + + Semaphore semaphore = new Semaphore(1); + doAnswer(invocation -> { + semaphore.acquire(); + throw new StorageContainerException(ContainerProtos.Result.IO_EXCEPTION); + }).when(containerSet).addContainer(any()); + + semaphore.acquire(); + CompletableFuture.runAsync(() -> + kvHandler.handleCreateContainer(createContainer, null) + ); + + // commit bytes has been allocated by volumeChoosingPolicy which is called in KeyValueContainer#create + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == maxContainerSize, + 1000, 50000); + semaphore.release(); + + LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes()); + + // release committed bytes as exception is thrown + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0, + 1000, 50000); + } + @Test public void testDeleteContainer() throws IOException { final String testDir = tempDir.toString(); From f52069b7aa757d39e4819bae7d06b12955660576 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 9 May 2025 01:01:14 +0800 Subject: [PATCH 11/14] Use threadlocal random for Capacity and Remove AtomicInteger as CapacityVolumeChoosingPolicy is thread-safe now --- .../common/volume/CapacityVolumeChoosingPolicy.java | 9 +++------ .../common/volume/RoundRobinVolumeChoosingPolicy.java | 8 +++----- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java index 64477c21969e..e323eeb4b173 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.List; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; @@ -44,9 +44,6 @@ public class CapacityVolumeChoosingPolicy implements VolumeChoosingPolicy { private static final Logger LOG = LoggerFactory.getLogger( CapacityVolumeChoosingPolicy.class); - // Stores the index of the next volume to be returned. - private final Random random = new Random(); - @Override public synchronized HddsVolume chooseVolume(List volumes, long maxContainerSize) throws IOException { @@ -82,8 +79,8 @@ public synchronized HddsVolume chooseVolume(List volumes, // 4. vol2 + vol2: 25%, result is vol2 // So we have a total of 75% chances to choose vol1, which meets our // expectation. - int firstIndex = random.nextInt(count); - int secondIndex = random.nextInt(count); + int firstIndex = ThreadLocalRandom.current().nextInt(count); + int secondIndex = ThreadLocalRandom.current().nextInt(count); HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex); HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java index b375f22034ac..52c8c599703c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.slf4j.Logger; @@ -38,7 +37,7 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { RoundRobinVolumeChoosingPolicy.class); // Stores the index of the next volume to be returned. - private AtomicInteger nextVolumeIndex = new AtomicInteger(0); + private int nextVolumeIndex = 0; @Override public synchronized HddsVolume chooseVolume(List volumes, @@ -53,8 +52,7 @@ public synchronized HddsVolume chooseVolume(List volumes, // since volumes could've been removed because of the failure // make sure we are not out of bounds - int nextIndex = nextVolumeIndex.get(); - int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0; + int currentVolumeIndex = nextVolumeIndex < volumes.size() ? nextVolumeIndex : 0; int startVolumeIndex = currentVolumeIndex; @@ -67,7 +65,7 @@ public synchronized HddsVolume chooseVolume(List volumes, if (hasEnoughSpace) { logIfSomeVolumesOutOfSpace(filter, LOG); - nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex); + nextVolumeIndex = currentVolumeIndex; volume.incCommittedBytes(maxContainerSize); return volume; } From c9d74703d417eae5ed249a80b014fc75238dd8da Mon Sep 17 00:00:00 2001 From: peterxcli Date: Sat, 10 May 2025 02:25:06 +0800 Subject: [PATCH 12/14] Release older and commit newer if container got conflict and swapped --- .../container/ozoneimpl/ContainerReader.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index abec716f8749..f3b39333e087 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -244,8 +244,13 @@ public void verifyAndFixupContainerData(ContainerData containerData) throw e; } if (shouldDelete) { - resolveDuplicate((KeyValueContainer) containerSet.getContainer( - kvContainer.getContainerData().getContainerID()), kvContainer); + KeyValueContainer existing = (KeyValueContainer) containerSet.getContainer( + kvContainer.getContainerData().getContainerID()); + boolean swapped = resolveDuplicate(existing, kvContainer); + if (swapped) { + existing.getContainerData().releaseCommitSpace(); + kvContainer.getContainerData().commitSpace(); + } } } break; @@ -256,7 +261,14 @@ public void verifyAndFixupContainerData(ContainerData containerData) } } - private void resolveDuplicate(KeyValueContainer existing, + /** + * Resolve duplicate containers. + * @param existing + * @param toAdd + * @return true if the container was swapped, false otherwise + * @throws IOException + */ + private boolean resolveDuplicate(KeyValueContainer existing, KeyValueContainer toAdd) throws IOException { if (existing.getContainerData().getReplicaIndex() != 0 || toAdd.getContainerData().getReplicaIndex() != 0) { @@ -270,7 +282,7 @@ private void resolveDuplicate(KeyValueContainer existing, existing.getContainerData().getContainerID(), existing.getContainerData().getContainerPath(), toAdd.getContainerData().getContainerPath()); - return; + return false; } long existingBCSID = existing.getBlockCommitSequenceId(); @@ -290,7 +302,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath(), toAddState); KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), hddsVolume.getConf()); - return; + return false; } else if (toAddState == CLOSED) { LOG.warn("Container {} is present at {} with state CLOSED and at " + "{} with state {}. Removing the latter container.", @@ -298,7 +310,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath(), existing.getContainerData().getContainerPath(), existingState); swapAndRemoveContainer(existing, toAdd); - return; + return true; } } @@ -311,6 +323,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath()); KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), hddsVolume.getConf()); + return false; } else { LOG.warn("Container {} is present at {} with a lesser BCSID " + "than at {}. Removing the former container.", @@ -318,6 +331,7 @@ private void resolveDuplicate(KeyValueContainer existing, existing.getContainerData().getContainerPath(), toAdd.getContainerData().getContainerPath()); swapAndRemoveContainer(existing, toAdd); + return true; } } From bb940941362c942c552d91beeeec558e0e4516ef Mon Sep 17 00:00:00 2001 From: peterxcli Date: Tue, 13 May 2025 15:59:19 +0000 Subject: [PATCH 13/14] Set commit space to true right after volume is choosen in KeyValueContainer --- .../hadoop/ozone/container/keyvalue/KeyValueContainer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index d58691dfb63e..5f28bc348a64 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -159,6 +159,8 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy HddsVolume containerVolume; try { containerVolume = volumeChoosingPolicy.chooseVolume(volumes, maxSize); + // commit bytes have been reserved in volumeChoosingPolicy#chooseVolume + containerData.setCommittedSpace(true); } catch (DiskOutOfSpaceException ex) { throw new StorageContainerException("Container creation failed, " + "due to disk out of space", ex, DISK_OUT_OF_SPACE); @@ -173,8 +175,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy // Set volume before getContainerDBFile(), because we may need the // volume to deduce the db file. containerData.setVolume(containerVolume); - // commit space has been reserved by volumeChoosingPolicy - containerData.setCommittedSpace(true); long containerID = containerData.getContainerID(); String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( From 83d397ba2fdd33ede6bb7b32630e834ff9e43ee8 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 14 May 2025 05:47:45 +0000 Subject: [PATCH 14/14] Set volume first, too, to prevent releaseSpace throw NPE when getVolume --- .../ozone/container/keyvalue/KeyValueContainer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 5f28bc348a64..030392045d51 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -157,8 +157,13 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); while (true) { HddsVolume containerVolume; + String hddsVolumeDir; try { containerVolume = volumeChoosingPolicy.chooseVolume(volumes, maxSize); + hddsVolumeDir = containerVolume.getHddsRootDir().toString(); + // Set volume before getContainerDBFile(), because we may need the + // volume to deduce the db file. + containerData.setVolume(containerVolume); // commit bytes have been reserved in volumeChoosingPolicy#chooseVolume containerData.setCommittedSpace(true); } catch (DiskOutOfSpaceException ex) { @@ -171,11 +176,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy } try { - String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); - // Set volume before getContainerDBFile(), because we may need the - // volume to deduce the db file. - containerData.setVolume(containerVolume); - long containerID = containerData.getContainerID(); String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( containerVolume, clusterId);