From 834be96e55e3e0ca6fd51012c72164f8a198d114 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 15 Apr 2025 16:41:47 -0400 Subject: [PATCH 01/10] Make on-demand scanner a normal instance --- .../OnDemandContainerDataScanner.java | 71 ++++++------------- 1 file changed, 23 insertions(+), 48 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 3a9c620fee62..aa2d47490d0b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -45,8 +45,6 @@ public final class OnDemandContainerDataScanner { public static final Logger LOG = LoggerFactory.getLogger(OnDemandContainerDataScanner.class); - private static volatile OnDemandContainerDataScanner instance; - private final ExecutorService scanExecutor; private final ContainerController containerController; private final DataTransferThrottler throttler; @@ -68,26 +66,11 @@ private OnDemandContainerDataScanner( minScanGap = conf.getContainerScanMinGap(); } - public static synchronized void init( - ContainerScannerConfiguration conf, ContainerController controller) { - if (instance != null) { - LOG.warn("Trying to initialize on demand scanner" + - " a second time on a datanode."); - return; - } - instance = new OnDemandContainerDataScanner(conf, controller); - } - - private static boolean shouldScan(Container container) { + private boolean shouldScan(Container container) { if (container == null) { return false; } long containerID = container.getContainerData().getContainerID(); - if (instance == null) { - LOG.debug("Skipping on demand scan for container {} since scanner was " + - "not initialized.", containerID); - return false; - } HddsVolume containerVolume = container.getContainerData().getVolume(); if (containerVolume.isFailed()) { @@ -96,11 +79,11 @@ private static boolean shouldScan(Container container) { return false; } - return !ContainerUtils.recentlyScanned(container, instance.minScanGap, + return !ContainerUtils.recentlyScanned(container, minScanGap, LOG) && container.shouldScanData(); } - public static Optional> scanContainer(Container container) { + public Optional> scanContainer(Container container) { if (!shouldScan(container)) { return Optional.empty(); } @@ -108,7 +91,7 @@ public static Optional> scanContainer(Container container) { Future resultFuture = null; long containerId = container.getContainerData().getContainerID(); if (addContainerToScheduledContainers(containerId)) { - resultFuture = instance.scanExecutor.submit(() -> { + resultFuture = scanExecutor.submit(() -> { performOnDemandScan(container); removeContainerFromScheduledContainers(containerId); }); @@ -116,16 +99,16 @@ public static Optional> scanContainer(Container container) { return Optional.ofNullable(resultFuture); } - private static boolean addContainerToScheduledContainers(long containerId) { - return instance.containerRescheduleCheckSet.add(containerId); + private boolean addContainerToScheduledContainers(long containerId) { + return containerRescheduleCheckSet.add(containerId); } - private static void removeContainerFromScheduledContainers( + private void removeContainerFromScheduledContainers( long containerId) { - instance.containerRescheduleCheckSet.remove(containerId); + containerRescheduleCheckSet.remove(containerId); } - private static void performOnDemandScan(Container container) { + private void performOnDemandScan(Container container) { if (!shouldScan(container)) { return; } @@ -135,21 +118,21 @@ private static void performOnDemandScan(Container container) { ContainerData containerData = container.getContainerData(); logScanStart(containerData); - ScanResult result = container.scanData(instance.throttler, instance.canceler); + ScanResult result = container.scanData(throttler, canceler); // Metrics for skipped containers should not be updated. if (result.isDeleted()) { LOG.debug("Container [{}] has been deleted during the data scan.", containerId); } else { if (!result.isHealthy()) { logUnhealthyScanResult(containerId, result, LOG); - boolean containerMarkedUnhealthy = instance.containerController + boolean containerMarkedUnhealthy = containerController .markContainerUnhealthy(containerId, result); if (containerMarkedUnhealthy) { - instance.metrics.incNumUnHealthyContainers(); + metrics.incNumUnHealthyContainers(); } } // TODO HDDS-10374 will need to update the merkle tree here as well. - instance.metrics.incNumContainersScanned(); + metrics.incNumContainersScanned(); } // Even if the container was deleted, mark the scan as completed since we already logged it as starting. @@ -157,7 +140,7 @@ private static void performOnDemandScan(Container container) { logScanCompleted(containerData, now); if (!result.isDeleted()) { - instance.containerController.updateDataScanTimestamp(containerId, now); + containerController.updateDataScanTimestamp(containerId, now); } } catch (IOException e) { LOG.warn("Unexpected exception while scanning container " @@ -169,7 +152,7 @@ private static void performOnDemandScan(Container container) { } } - private static void logScanStart(ContainerData containerData) { + private void logScanStart(ContainerData containerData) { if (LOG.isDebugEnabled()) { Optional scanTimestamp = containerData.lastDataScanTime(); Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); @@ -178,35 +161,27 @@ private static void logScanStart(ContainerData containerData) { } } - private static void logScanCompleted( + private void logScanCompleted( ContainerData containerData, Instant timestamp) { LOG.debug("Completed scan of container {} at {}", containerData.getContainerID(), timestamp); } - public static OnDemandScannerMetrics getMetrics() { - return instance.metrics; + public OnDemandScannerMetrics getMetrics() { + return metrics; } @VisibleForTesting - public static DataTransferThrottler getThrottler() { - return instance.throttler; + public DataTransferThrottler getThrottler() { + return throttler; } @VisibleForTesting - public static Canceler getCanceler() { - return instance.canceler; - } - - public static synchronized void shutdown() { - if (instance == null) { - return; - } - instance.shutdownScanner(); + public Canceler getCanceler() { + return canceler; } - private synchronized void shutdownScanner() { - instance = null; + public synchronized void shutdown() { metrics.unregister(); String shutdownMessage = "On-demand container scanner is shutting down."; LOG.info(shutdownMessage); From e73757ebe813a41c645a5b30800753d256977682 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 15 Apr 2025 19:02:08 -0400 Subject: [PATCH 02/10] Register on-demand scan callback in ContainerSet --- .../container/common/impl/ContainerSet.java | 17 +++++++++++++++++ .../ozoneimpl/OnDemandContainerDataScanner.java | 2 +- .../container/ozoneimpl/OzoneContainer.java | 4 +++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 2124bd9c4700..0e431f7784b0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -37,6 +37,8 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -66,6 +68,8 @@ public class ContainerSet implements Iterable> { private Clock clock; private long recoveringTimeout; private final Table containerIdsTable; + // Handler that will be invoked when a user of a container reports an error. + private Consumer> containerErrorHandler; @VisibleForTesting public ContainerSet(long recoveringTimeout) { @@ -128,6 +132,19 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor } } + /** + * @param handler All callback that will be invoked when an error is reported with a member of this container set. + */ + public void registerContainerErrorHandler(Consumer> handler) { + this.containerErrorHandler = handler; + } + + public void reportError(long containerID) { + if (containerErrorHandler != null) { + containerErrorHandler.accept(getContainer(containerID)); + } + } + /** * Add Container to container map. * @param container container to be added diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index aa2d47490d0b..46da3f4521eb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -54,7 +54,7 @@ public final class OnDemandContainerDataScanner { private final OnDemandScannerMetrics metrics; private final long minScanGap; - private OnDemandContainerDataScanner( + public OnDemandContainerDataScanner( ContainerScannerConfiguration conf, ContainerController controller) { containerController = controller; throttler = new DataTransferThrottler( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 370b455e3367..6d2f0a5d07ba 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -121,6 +121,7 @@ public class OzoneContainer { private final XceiverServerSpi readChannel; private final ContainerController controller; private BackgroundContainerMetadataScanner metadataScanner; + private OnDemandContainerDataScanner onDemandScanner; private List dataScanners; private List backgroundScanners; private final BlockDeletingService blockDeletingService; @@ -432,7 +433,8 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) { "so the on-demand container data scanner will not start."); return; } - OnDemandContainerDataScanner.init(c, controller); + onDemandScanner = new OnDemandContainerDataScanner(c, controller); + containerSet.registerContainerErrorHandler(onDemandScanner::scanContainer); } /** From f0d8efee73a18736a8dbad7068ca7e016eb025ac Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 15 Apr 2025 19:05:41 -0400 Subject: [PATCH 03/10] Migrate scanContainer usage in prod code --- .../container/common/impl/HddsDispatcher.java | 2 +- .../container/keyvalue/KeyValueHandler.java | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 0d113d467d94..9572186cd1f1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -428,7 +428,7 @@ && getMissingContainerSet().contains(containerID)) { // Create a specific exception that signals for on demand scanning // and move this general scan to where it is more appropriate. // Add integration tests to test the full functionality. - OnDemandContainerDataScanner.scanContainer(container); + containerSet.reportError(containerID); audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception(responseProto.getMessage())); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 43926ca5e282..ef4a430fd363 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1507,6 +1507,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container Set peers) throws IOException { KeyValueContainer kvContainer = (KeyValueContainer) container; KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + long containerID = containerData.getContainerID(); Optional optionalChecksumInfo = checksumManager.read(containerData); ContainerProtos.ContainerChecksumInfo checksumInfo; @@ -1521,10 +1522,10 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container for (DatanodeDetails peer : peers) { long start = Instant.now().toEpochMilli(); ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( - containerData.getContainerID(), peer); + containerID, peer); if (peerChecksumInfo == null) { LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", - containerData.getContainerID(), peer); + containerID, peer); continue; } @@ -1538,7 +1539,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), - containerData.getContainerID(), e); + containerID, e); } } @@ -1548,7 +1549,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), - containerData.getContainerID(), e); + containerID, e); } } @@ -1558,7 +1559,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerData.getContainerID(), e); + containerID, e); } } // Update checksum based on RocksDB metadata. The read chunk validates the checksum of the data @@ -1570,18 +1571,18 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container if (dataChecksum == oldDataChecksum) { metrics.incContainerReconciledWithoutChanges(); LOG.info("Container {} reconciled with peer {}. No change in checksum. Current checksum {}. Time taken {} ms", - containerData.getContainerID(), peer.toString(), checksumToString(dataChecksum), duration); + containerID, peer.toString(), checksumToString(dataChecksum), duration); } else { metrics.incContainerReconciledWithChanges(); LOG.warn("Container {} reconciled with peer {}. Checksum updated from {} to {}. Time taken {} ms", - containerData.getContainerID(), peer.toString(), checksumToString(oldDataChecksum), + containerID, peer.toString(), checksumToString(oldDataChecksum), checksumToString(dataChecksum), duration); } ContainerLogger.logReconciled(container.getContainerData(), oldDataChecksum, peer); } // Trigger manual on demand scanner - OnDemandContainerDataScanner.scanContainer(container); + containerSet.reportError(containerID); sendICR(container); } From 4cb054c8e806c47fa5e6b72f46dec0df652e30e8 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 15 Apr 2025 19:13:35 -0400 Subject: [PATCH 04/10] Switch terminology from error to scan. Add existence checks --- .../container/common/impl/ContainerSet.java | 24 +++++++++++++------ .../container/common/impl/HddsDispatcher.java | 3 +-- .../container/keyvalue/KeyValueHandler.java | 3 +-- .../container/ozoneimpl/OzoneContainer.java | 2 +- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 0e431f7784b0..28412fa100cc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -69,7 +69,7 @@ public class ContainerSet implements Iterable> { private long recoveringTimeout; private final Table containerIdsTable; // Handler that will be invoked when a user of a container reports an error. - private Consumer> containerErrorHandler; + private Consumer> containerScanHandler; @VisibleForTesting public ContainerSet(long recoveringTimeout) { @@ -133,15 +133,25 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor } /** - * @param handler All callback that will be invoked when an error is reported with a member of this container set. + * @param scanner A callback that will be invoked when a scan of a container in this set is requested. */ - public void registerContainerErrorHandler(Consumer> handler) { - this.containerErrorHandler = handler; + public void registerContainerScanHandler(Consumer> scanner) { + this.containerScanHandler = scanner; } - public void reportError(long containerID) { - if (containerErrorHandler != null) { - containerErrorHandler.accept(getContainer(containerID)); + /** + * Triggers a scan of a container in this set using the registered scan handler. This is a no-op if no scan handler + * is registered or the container does not exist in the set. + * @param containerID The container in this set to scan. + */ + public void scanContainer(long containerID) { + if (containerScanHandler != null) { + Container container = getContainer(containerID); + if (container != null) { + containerScanHandler.accept(container); + } else { + LOG.warn("Request to scan container {} which was not found in the container set", containerID); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 9572186cd1f1..e240ff6bf478 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -74,7 +74,6 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError; import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult; -import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; @@ -428,7 +427,7 @@ && getMissingContainerSet().contains(containerID)) { // Create a specific exception that signals for on demand scanning // and move this general scan to where it is more appropriate. // Add integration tests to test the full functionality. - containerSet.reportError(containerID); + containerSet.scanContainer(containerID); audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception(responseProto.getMessage())); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index ef4a430fd363..ab07edb6b78e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -155,7 +155,6 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; @@ -1582,7 +1581,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container } // Trigger manual on demand scanner - containerSet.reportError(containerID); + containerSet.scanContainer(containerID); sendICR(container); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 6d2f0a5d07ba..e1477e87484a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -434,7 +434,7 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) { return; } onDemandScanner = new OnDemandContainerDataScanner(c, controller); - containerSet.registerContainerErrorHandler(onDemandScanner::scanContainer); + containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); } /** From 8abedb64325e5ded9ae2ea6ffc21668637f293d8 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 15 Apr 2025 19:27:04 -0400 Subject: [PATCH 05/10] Update tests --- .../container/ozoneimpl/OzoneContainer.java | 2 +- .../TestOnDemandContainerDataScanner.java | 67 ++++++++----------- 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index e1477e87484a..bb9ac58c8fe6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -453,7 +453,7 @@ private void stopContainerScrub() { for (BackgroundContainerDataScanner s : dataScanners) { s.shutdown(); } - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); } @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index 87a77e6d8f97..75dfae3b822c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@ -60,10 +60,13 @@ @MockitoSettings(strictness = Strictness.LENIENT) public class TestOnDemandContainerDataScanner extends TestContainerScannersAbstract { + + private OnDemandContainerDataScanner onDemandScanner; @BeforeEach public void setup() { super.setup(); + onDemandScanner = new OnDemandContainerDataScanner(conf, controller); } @Test @@ -96,14 +99,13 @@ public void testUnscannedContainerIsScanned() throws Exception { @AfterEach public void tearDown() { - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); } @Test public void testScanTimestampUpdated() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); Optional> scanFuture = - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, atLeastOnce()) @@ -112,7 +114,7 @@ public void testScanTimestampUpdated() throws Exception { // Metrics for deleted container should not be updated. scanFuture = - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, never()) @@ -121,35 +123,28 @@ public void testScanTimestampUpdated() throws Exception { } @Test - public void testContainerScannerMultipleInitsAndShutdowns() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.shutdown(); - OnDemandContainerDataScanner.shutdown(); - //There shouldn't be an interaction after shutdown: - OnDemandContainerDataScanner.scanContainer(corruptData); - verifyContainerMarkedUnhealthy(corruptData, never()); + public void testContainerScannerMultipleShutdowns() { + // No runtime exceptions should be thrown. + onDemandScanner.shutdown(); + onDemandScanner.shutdown(); } @Test public void testSameContainerQueuedMultipleTimes() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); //Given a container that has not finished scanning CountDownLatch latch = new CountDownLatch(1); when(corruptData.scanData( - OnDemandContainerDataScanner.getThrottler(), - OnDemandContainerDataScanner.getCanceler())) + any(), + any())) .thenAnswer((Answer) invocation -> { latch.await(); return getUnhealthyDataScanResult(); }); - Optional> onGoingScan = OnDemandContainerDataScanner - .scanContainer(corruptData); + Optional> onGoingScan = onDemandScanner.scanContainer(corruptData); assertTrue(onGoingScan.isPresent()); assertFalse(onGoingScan.get().isDone()); //When scheduling the same container again - Optional> secondScan = OnDemandContainerDataScanner - .scanContainer(corruptData); + Optional> secondScan = onDemandScanner.scanContainer(corruptData); //Then the second scan is not scheduled and the first scan can still finish assertFalse(secondScan.isPresent()); latch.countDown(); @@ -161,19 +156,18 @@ public void testSameContainerQueuedMultipleTimes() throws Exception { @Test @Override public void testScannerMetrics() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); ArrayList>> resultFutureList = Lists.newArrayList(); - resultFutureList.add(OnDemandContainerDataScanner.scanContainer( + resultFutureList.add(onDemandScanner.scanContainer( corruptData)); resultFutureList.add( - OnDemandContainerDataScanner.scanContainer(openContainer)); + onDemandScanner.scanContainer(openContainer)); resultFutureList.add( - OnDemandContainerDataScanner.scanContainer(openCorruptMetadata)); - resultFutureList.add(OnDemandContainerDataScanner.scanContainer(healthy)); + onDemandScanner.scanContainer(openCorruptMetadata)); + resultFutureList.add(onDemandScanner.scanContainer(healthy)); // Deleted containers will not count towards the scan count metric. - resultFutureList.add(OnDemandContainerDataScanner.scanContainer(deletedContainer)); + resultFutureList.add(onDemandScanner.scanContainer(deletedContainer)); waitOnScannerToFinish(resultFutureList); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); //Containers with shouldScanData = false shouldn't increase // the number of scanned containers assertEquals(0, metrics.getNumUnHealthyContainers()); @@ -183,11 +177,9 @@ public void testScannerMetrics() throws Exception { @Test @Override public void testScannerMetricsUnregisters() { - OnDemandContainerDataScanner.init(conf, controller); - String metricsName = OnDemandContainerDataScanner.getMetrics().getName(); + String metricsName = onDemandScanner.getMetrics().getName(); assertNotNull(DefaultMetricsSystem.instance().getSource(metricsName)); - OnDemandContainerDataScanner.shutdown(); - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.shutdown(); assertNull(DefaultMetricsSystem.instance().getSource(metricsName)); } @@ -196,7 +188,7 @@ public void testScannerMetricsUnregisters() { public void testUnhealthyContainersDetected() throws Exception { // Without initialization, // there shouldn't be interaction with containerController - OnDemandContainerDataScanner.scanContainer(corruptData); + onDemandScanner.scanContainer(corruptData); verifyNoInteractions(controller); scanContainer(healthy); @@ -223,8 +215,7 @@ public void testUnhealthyContainersDetected() throws Exception { public void testWithVolumeFailure() throws Exception { when(vol.isFailed()).thenReturn(true); - OnDemandContainerDataScanner.init(conf, controller); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); scanContainer(healthy); verifyContainerMarkedUnhealthy(healthy, never()); @@ -248,11 +239,10 @@ public void testShutdownDuringScan() throws Exception { }); // Start the blocking scan. - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.scanContainer(healthy); // Shut down the on demand scanner. This will interrupt the blocked scan // on the healthy container. - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); // Interrupting the healthy container's scan should not mark it unhealthy. verifyContainerMarkedUnhealthy(healthy, never()); } @@ -271,7 +261,7 @@ public void testUnhealthyContainerRescanned() throws Exception { // First iteration should find the unhealthy container. scanContainer(unhealthy); verifyContainerMarkedUnhealthy(unhealthy, atMostOnce()); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); assertEquals(1, metrics.getNumContainersScanned()); assertEquals(1, metrics.getNumUnHealthyContainers()); @@ -296,9 +286,8 @@ public void testUnhealthyContainerRescanned() throws Exception { } private void scanContainer(Container container) throws Exception { - OnDemandContainerDataScanner.init(conf, controller); Optional> scanFuture = - OnDemandContainerDataScanner.scanContainer(container); + onDemandScanner.scanContainer(container); if (scanFuture.isPresent()) { scanFuture.get().get(); } From 577a075fa182ee80c3b407975b1b6b53bc9d5f8e Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Wed, 16 Apr 2025 12:00:23 -0400 Subject: [PATCH 06/10] Add unit test for ContainerSet --- .../container/common/impl/TestContainerSet.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index 84d44534525f..45e8e6577895 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.LongStream; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -48,6 +49,7 @@ import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.junit.jupiter.api.Test; /** * Class used to test ContainerSet operations. @@ -284,6 +286,21 @@ public void testListContainerFromFirstKey(ContainerLayoutVersion layout) assertContainerIds(FIRST_ID, count, result); } + @ContainerLayoutTestInfo.ContainerTest + public void testContainerScanHandler(ContainerLayoutVersion layout) throws Exception { + setLayoutVersion(layout); + ContainerSet containerSet = createContainerSet(); + // Atomic long required since lambda modification must be effectively final. + AtomicLong invocationCount = new AtomicLong(); + containerSet.registerContainerScanHandler(c -> { + assertEquals(c.getContainerData().getContainerID(), FIRST_ID); + invocationCount.getAndIncrement(); + }); + + containerSet.scanContainer(FIRST_ID); + assertEquals(1, invocationCount.get()); + } + /** * Verify that {@code result} contains {@code count} containers * with IDs in increasing order starting at {@code startId}. From 4c8d8436a1f86c82c675d6f166d7e62c624994ef Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Wed, 16 Apr 2025 12:02:02 -0400 Subject: [PATCH 07/10] Checkstyle --- .../apache/hadoop/ozone/container/common/impl/ContainerSet.java | 1 - .../hadoop/ozone/container/common/impl/TestContainerSet.java | 1 - 2 files changed, 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 28412fa100cc..ca0ea191fcf3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; - import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index 45e8e6577895..b14ff219eecc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -49,7 +49,6 @@ import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.junit.jupiter.api.Test; /** * Class used to test ContainerSet operations. From 0bd4127052ee2abb55301ff3e81a950305bf0a41 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Wed, 16 Apr 2025 12:15:58 -0400 Subject: [PATCH 08/10] Improve comments and test --- .../hadoop/ozone/container/common/impl/ContainerSet.java | 2 +- .../ozone/container/common/impl/TestContainerSet.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index ca0ea191fcf3..8204f58953c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -67,7 +67,7 @@ public class ContainerSet implements Iterable> { private Clock clock; private long recoveringTimeout; private final Table containerIdsTable; - // Handler that will be invoked when a user of a container reports an error. + // Handler that will be invoked when a scan of a container in this set is requested. private Consumer> containerScanHandler; @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index b14ff219eecc..377b921297d3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -289,13 +289,20 @@ public void testListContainerFromFirstKey(ContainerLayoutVersion layout) public void testContainerScanHandler(ContainerLayoutVersion layout) throws Exception { setLayoutVersion(layout); ContainerSet containerSet = createContainerSet(); - // Atomic long required since lambda modification must be effectively final. + // Scan when no handler is registered should not throw an exception. + containerSet.scanContainer(FIRST_ID); + + // Scan of non-existent container should not throw exception or trigger the handler. + containerSet.scanContainer(FIRST_ID - 1); AtomicLong invocationCount = new AtomicLong(); containerSet.registerContainerScanHandler(c -> { + // If the handler was incorrectly triggered for a non-existent container, this assert would fail. assertEquals(c.getContainerData().getContainerID(), FIRST_ID); invocationCount.getAndIncrement(); }); + assertEquals(0, invocationCount.get()); + // Only scan of an existing container when a handler is registered should trigger a scan. containerSet.scanContainer(FIRST_ID); assertEquals(1, invocationCount.get()); } From 28b3230d882b2adebe5d89d7d1e82bdb178b9996 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Mon, 21 Apr 2025 19:33:24 -0400 Subject: [PATCH 09/10] Address review comments --- .../OnDemandContainerDataScanner.java | 10 ---------- .../common/impl/TestContainerSet.java | 11 ++++++----- .../TestOnDemandContainerDataScanner.java | 18 ++++++------------ 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 46da3f4521eb..6f83b5b0b2b3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -171,16 +171,6 @@ public OnDemandScannerMetrics getMetrics() { return metrics; } - @VisibleForTesting - public DataTransferThrottler getThrottler() { - return throttler; - } - - @VisibleForTesting - public Canceler getCanceler() { - return canceler; - } - public synchronized void shutdown() { metrics.unregister(); String shutdownMessage = "On-demand container scanner is shutting down."; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index 377b921297d3..89aa4fb56509 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -292,19 +292,20 @@ public void testContainerScanHandler(ContainerLayoutVersion layout) throws Excep // Scan when no handler is registered should not throw an exception. containerSet.scanContainer(FIRST_ID); - // Scan of non-existent container should not throw exception or trigger the handler. - containerSet.scanContainer(FIRST_ID - 1); AtomicLong invocationCount = new AtomicLong(); containerSet.registerContainerScanHandler(c -> { // If the handler was incorrectly triggered for a non-existent container, this assert would fail. - assertEquals(c.getContainerData().getContainerID(), FIRST_ID); + assertEquals(FIRST_ID, c.getContainerData().getContainerID()); invocationCount.getAndIncrement(); }); - assertEquals(0, invocationCount.get()); - // Only scan of an existing container when a handler is registered should trigger a scan. + // Scan of an existing container when a handler is registered should trigger a scan. containerSet.scanContainer(FIRST_ID); assertEquals(1, invocationCount.get()); + + // Scan of non-existent container should not throw exception or trigger an additional invocation. + containerSet.scanContainer(FIRST_ID - 1); + assertEquals(1, invocationCount.get()); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index 75dfae3b822c..6ea5ff0b1265 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@ -104,8 +104,7 @@ public void tearDown() { @Test public void testScanTimestampUpdated() throws Exception { - Optional> scanFuture = - onDemandScanner.scanContainer(healthy); + Optional> scanFuture = onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, atLeastOnce()) @@ -113,8 +112,7 @@ public void testScanTimestampUpdated() throws Exception { eq(healthy.getContainerData().getContainerID()), any()); // Metrics for deleted container should not be updated. - scanFuture = - onDemandScanner.scanContainer(healthy); + scanFuture = onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, never()) @@ -157,12 +155,9 @@ public void testSameContainerQueuedMultipleTimes() throws Exception { @Override public void testScannerMetrics() throws Exception { ArrayList>> resultFutureList = Lists.newArrayList(); - resultFutureList.add(onDemandScanner.scanContainer( - corruptData)); - resultFutureList.add( - onDemandScanner.scanContainer(openContainer)); - resultFutureList.add( - onDemandScanner.scanContainer(openCorruptMetadata)); + resultFutureList.add(onDemandScanner.scanContainer(corruptData)); + resultFutureList.add(onDemandScanner.scanContainer(openContainer)); + resultFutureList.add(onDemandScanner.scanContainer(openCorruptMetadata)); resultFutureList.add(onDemandScanner.scanContainer(healthy)); // Deleted containers will not count towards the scan count metric. resultFutureList.add(onDemandScanner.scanContainer(deletedContainer)); @@ -286,8 +281,7 @@ public void testUnhealthyContainerRescanned() throws Exception { } private void scanContainer(Container container) throws Exception { - Optional> scanFuture = - onDemandScanner.scanContainer(container); + Optional> scanFuture = onDemandScanner.scanContainer(container); if (scanFuture.isPresent()) { scanFuture.get().get(); } From 4c08e7f28126dbc48dcea54bebc429b1d9e2e6d6 Mon Sep 17 00:00:00 2001 From: Ethan Rose Date: Tue, 22 Apr 2025 10:24:55 -0400 Subject: [PATCH 10/10] Checkstyle --- .../ozone/container/ozoneimpl/OnDemandContainerDataScanner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 6f83b5b0b2b3..619765711454 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.ozone.container.ozoneimpl.AbstractBackgroundContainerScanner.logUnhealthyScanResult; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.time.Instant; import java.util.Optional;