diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 2124bd9c4700..8204f58953c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -66,6 +67,8 @@ public class ContainerSet implements Iterable> { private Clock clock; private long recoveringTimeout; private final Table containerIdsTable; + // Handler that will be invoked when a scan of a container in this set is requested. + private Consumer> containerScanHandler; @VisibleForTesting public ContainerSet(long recoveringTimeout) { @@ -128,6 +131,29 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor } } + /** + * @param scanner A callback that will be invoked when a scan of a container in this set is requested. + */ + public void registerContainerScanHandler(Consumer> scanner) { + this.containerScanHandler = scanner; + } + + /** + * Triggers a scan of a container in this set using the registered scan handler. This is a no-op if no scan handler + * is registered or the container does not exist in the set. + * @param containerID The container in this set to scan. + */ + public void scanContainer(long containerID) { + if (containerScanHandler != null) { + Container container = getContainer(containerID); + if (container != null) { + containerScanHandler.accept(container); + } else { + LOG.warn("Request to scan container {} which was not found in the container set", containerID); + } + } + } + /** * Add Container to container map. * @param container container to be added diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 0d113d467d94..e240ff6bf478 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -74,7 +74,6 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError; import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult; -import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; @@ -428,7 +427,7 @@ && getMissingContainerSet().contains(containerID)) { // Create a specific exception that signals for on demand scanning // and move this general scan to where it is more appropriate. // Add integration tests to test the full functionality. - OnDemandContainerDataScanner.scanContainer(container); + containerSet.scanContainer(containerID); audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception(responseProto.getMessage())); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 43926ca5e282..ab07edb6b78e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -155,7 +155,6 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; @@ -1507,6 +1506,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container Set peers) throws IOException { KeyValueContainer kvContainer = (KeyValueContainer) container; KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + long containerID = containerData.getContainerID(); Optional optionalChecksumInfo = checksumManager.read(containerData); ContainerProtos.ContainerChecksumInfo checksumInfo; @@ -1521,10 +1521,10 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container for (DatanodeDetails peer : peers) { long start = Instant.now().toEpochMilli(); ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( - containerData.getContainerID(), peer); + containerID, peer); if (peerChecksumInfo == null) { LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", - containerData.getContainerID(), peer); + containerID, peer); continue; } @@ -1538,7 +1538,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), - containerData.getContainerID(), e); + containerID, e); } } @@ -1548,7 +1548,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), - containerData.getContainerID(), e); + containerID, e); } } @@ -1558,7 +1558,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); } catch (IOException e) { LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerData.getContainerID(), e); + containerID, e); } } // Update checksum based on RocksDB metadata. The read chunk validates the checksum of the data @@ -1570,18 +1570,18 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container if (dataChecksum == oldDataChecksum) { metrics.incContainerReconciledWithoutChanges(); LOG.info("Container {} reconciled with peer {}. No change in checksum. Current checksum {}. Time taken {} ms", - containerData.getContainerID(), peer.toString(), checksumToString(dataChecksum), duration); + containerID, peer.toString(), checksumToString(dataChecksum), duration); } else { metrics.incContainerReconciledWithChanges(); LOG.warn("Container {} reconciled with peer {}. Checksum updated from {} to {}. Time taken {} ms", - containerData.getContainerID(), peer.toString(), checksumToString(oldDataChecksum), + containerID, peer.toString(), checksumToString(oldDataChecksum), checksumToString(dataChecksum), duration); } ContainerLogger.logReconciled(container.getContainerData(), oldDataChecksum, peer); } // Trigger manual on demand scanner - OnDemandContainerDataScanner.scanContainer(container); + containerSet.scanContainer(containerID); sendICR(container); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 3a9c620fee62..619765711454 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.ozone.container.ozoneimpl.AbstractBackgroundContainerScanner.logUnhealthyScanResult; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.time.Instant; import java.util.Optional; @@ -45,8 +44,6 @@ public final class OnDemandContainerDataScanner { public static final Logger LOG = LoggerFactory.getLogger(OnDemandContainerDataScanner.class); - private static volatile OnDemandContainerDataScanner instance; - private final ExecutorService scanExecutor; private final ContainerController containerController; private final DataTransferThrottler throttler; @@ -56,7 +53,7 @@ public final class OnDemandContainerDataScanner { private final OnDemandScannerMetrics metrics; private final long minScanGap; - private OnDemandContainerDataScanner( + public OnDemandContainerDataScanner( ContainerScannerConfiguration conf, ContainerController controller) { containerController = controller; throttler = new DataTransferThrottler( @@ -68,26 +65,11 @@ private OnDemandContainerDataScanner( minScanGap = conf.getContainerScanMinGap(); } - public static synchronized void init( - ContainerScannerConfiguration conf, ContainerController controller) { - if (instance != null) { - LOG.warn("Trying to initialize on demand scanner" + - " a second time on a datanode."); - return; - } - instance = new OnDemandContainerDataScanner(conf, controller); - } - - private static boolean shouldScan(Container container) { + private boolean shouldScan(Container container) { if (container == null) { return false; } long containerID = container.getContainerData().getContainerID(); - if (instance == null) { - LOG.debug("Skipping on demand scan for container {} since scanner was " + - "not initialized.", containerID); - return false; - } HddsVolume containerVolume = container.getContainerData().getVolume(); if (containerVolume.isFailed()) { @@ -96,11 +78,11 @@ private static boolean shouldScan(Container container) { return false; } - return !ContainerUtils.recentlyScanned(container, instance.minScanGap, + return !ContainerUtils.recentlyScanned(container, minScanGap, LOG) && container.shouldScanData(); } - public static Optional> scanContainer(Container container) { + public Optional> scanContainer(Container container) { if (!shouldScan(container)) { return Optional.empty(); } @@ -108,7 +90,7 @@ public static Optional> scanContainer(Container container) { Future resultFuture = null; long containerId = container.getContainerData().getContainerID(); if (addContainerToScheduledContainers(containerId)) { - resultFuture = instance.scanExecutor.submit(() -> { + resultFuture = scanExecutor.submit(() -> { performOnDemandScan(container); removeContainerFromScheduledContainers(containerId); }); @@ -116,16 +98,16 @@ public static Optional> scanContainer(Container container) { return Optional.ofNullable(resultFuture); } - private static boolean addContainerToScheduledContainers(long containerId) { - return instance.containerRescheduleCheckSet.add(containerId); + private boolean addContainerToScheduledContainers(long containerId) { + return containerRescheduleCheckSet.add(containerId); } - private static void removeContainerFromScheduledContainers( + private void removeContainerFromScheduledContainers( long containerId) { - instance.containerRescheduleCheckSet.remove(containerId); + containerRescheduleCheckSet.remove(containerId); } - private static void performOnDemandScan(Container container) { + private void performOnDemandScan(Container container) { if (!shouldScan(container)) { return; } @@ -135,21 +117,21 @@ private static void performOnDemandScan(Container container) { ContainerData containerData = container.getContainerData(); logScanStart(containerData); - ScanResult result = container.scanData(instance.throttler, instance.canceler); + ScanResult result = container.scanData(throttler, canceler); // Metrics for skipped containers should not be updated. if (result.isDeleted()) { LOG.debug("Container [{}] has been deleted during the data scan.", containerId); } else { if (!result.isHealthy()) { logUnhealthyScanResult(containerId, result, LOG); - boolean containerMarkedUnhealthy = instance.containerController + boolean containerMarkedUnhealthy = containerController .markContainerUnhealthy(containerId, result); if (containerMarkedUnhealthy) { - instance.metrics.incNumUnHealthyContainers(); + metrics.incNumUnHealthyContainers(); } } // TODO HDDS-10374 will need to update the merkle tree here as well. - instance.metrics.incNumContainersScanned(); + metrics.incNumContainersScanned(); } // Even if the container was deleted, mark the scan as completed since we already logged it as starting. @@ -157,7 +139,7 @@ private static void performOnDemandScan(Container container) { logScanCompleted(containerData, now); if (!result.isDeleted()) { - instance.containerController.updateDataScanTimestamp(containerId, now); + containerController.updateDataScanTimestamp(containerId, now); } } catch (IOException e) { LOG.warn("Unexpected exception while scanning container " @@ -169,7 +151,7 @@ private static void performOnDemandScan(Container container) { } } - private static void logScanStart(ContainerData containerData) { + private void logScanStart(ContainerData containerData) { if (LOG.isDebugEnabled()) { Optional scanTimestamp = containerData.lastDataScanTime(); Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); @@ -178,35 +160,17 @@ private static void logScanStart(ContainerData containerData) { } } - private static void logScanCompleted( + private void logScanCompleted( ContainerData containerData, Instant timestamp) { LOG.debug("Completed scan of container {} at {}", containerData.getContainerID(), timestamp); } - public static OnDemandScannerMetrics getMetrics() { - return instance.metrics; - } - - @VisibleForTesting - public static DataTransferThrottler getThrottler() { - return instance.throttler; - } - - @VisibleForTesting - public static Canceler getCanceler() { - return instance.canceler; - } - - public static synchronized void shutdown() { - if (instance == null) { - return; - } - instance.shutdownScanner(); + public OnDemandScannerMetrics getMetrics() { + return metrics; } - private synchronized void shutdownScanner() { - instance = null; + public synchronized void shutdown() { metrics.unregister(); String shutdownMessage = "On-demand container scanner is shutting down."; LOG.info(shutdownMessage); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 370b455e3367..bb9ac58c8fe6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -121,6 +121,7 @@ public class OzoneContainer { private final XceiverServerSpi readChannel; private final ContainerController controller; private BackgroundContainerMetadataScanner metadataScanner; + private OnDemandContainerDataScanner onDemandScanner; private List dataScanners; private List backgroundScanners; private final BlockDeletingService blockDeletingService; @@ -432,7 +433,8 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) { "so the on-demand container data scanner will not start."); return; } - OnDemandContainerDataScanner.init(c, controller); + onDemandScanner = new OnDemandContainerDataScanner(c, controller); + containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); } /** @@ -451,7 +453,7 @@ private void stopContainerScrub() { for (BackgroundContainerDataScanner s : dataScanners) { s.shutdown(); } - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); } @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index 84d44534525f..89aa4fb56509 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.LongStream; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -284,6 +285,29 @@ public void testListContainerFromFirstKey(ContainerLayoutVersion layout) assertContainerIds(FIRST_ID, count, result); } + @ContainerLayoutTestInfo.ContainerTest + public void testContainerScanHandler(ContainerLayoutVersion layout) throws Exception { + setLayoutVersion(layout); + ContainerSet containerSet = createContainerSet(); + // Scan when no handler is registered should not throw an exception. + containerSet.scanContainer(FIRST_ID); + + AtomicLong invocationCount = new AtomicLong(); + containerSet.registerContainerScanHandler(c -> { + // If the handler was incorrectly triggered for a non-existent container, this assert would fail. + assertEquals(FIRST_ID, c.getContainerData().getContainerID()); + invocationCount.getAndIncrement(); + }); + + // Scan of an existing container when a handler is registered should trigger a scan. + containerSet.scanContainer(FIRST_ID); + assertEquals(1, invocationCount.get()); + + // Scan of non-existent container should not throw exception or trigger an additional invocation. + containerSet.scanContainer(FIRST_ID - 1); + assertEquals(1, invocationCount.get()); + } + /** * Verify that {@code result} contains {@code count} containers * with IDs in increasing order starting at {@code startId}. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index 87a77e6d8f97..6ea5ff0b1265 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@ -60,10 +60,13 @@ @MockitoSettings(strictness = Strictness.LENIENT) public class TestOnDemandContainerDataScanner extends TestContainerScannersAbstract { + + private OnDemandContainerDataScanner onDemandScanner; @BeforeEach public void setup() { super.setup(); + onDemandScanner = new OnDemandContainerDataScanner(conf, controller); } @Test @@ -96,14 +99,12 @@ public void testUnscannedContainerIsScanned() throws Exception { @AfterEach public void tearDown() { - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); } @Test public void testScanTimestampUpdated() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); - Optional> scanFuture = - OnDemandContainerDataScanner.scanContainer(healthy); + Optional> scanFuture = onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, atLeastOnce()) @@ -111,8 +112,7 @@ public void testScanTimestampUpdated() throws Exception { eq(healthy.getContainerData().getContainerID()), any()); // Metrics for deleted container should not be updated. - scanFuture = - OnDemandContainerDataScanner.scanContainer(healthy); + scanFuture = onDemandScanner.scanContainer(healthy); assertTrue(scanFuture.isPresent()); scanFuture.get().get(); verify(controller, never()) @@ -121,35 +121,28 @@ public void testScanTimestampUpdated() throws Exception { } @Test - public void testContainerScannerMultipleInitsAndShutdowns() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.shutdown(); - OnDemandContainerDataScanner.shutdown(); - //There shouldn't be an interaction after shutdown: - OnDemandContainerDataScanner.scanContainer(corruptData); - verifyContainerMarkedUnhealthy(corruptData, never()); + public void testContainerScannerMultipleShutdowns() { + // No runtime exceptions should be thrown. + onDemandScanner.shutdown(); + onDemandScanner.shutdown(); } @Test public void testSameContainerQueuedMultipleTimes() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); //Given a container that has not finished scanning CountDownLatch latch = new CountDownLatch(1); when(corruptData.scanData( - OnDemandContainerDataScanner.getThrottler(), - OnDemandContainerDataScanner.getCanceler())) + any(), + any())) .thenAnswer((Answer) invocation -> { latch.await(); return getUnhealthyDataScanResult(); }); - Optional> onGoingScan = OnDemandContainerDataScanner - .scanContainer(corruptData); + Optional> onGoingScan = onDemandScanner.scanContainer(corruptData); assertTrue(onGoingScan.isPresent()); assertFalse(onGoingScan.get().isDone()); //When scheduling the same container again - Optional> secondScan = OnDemandContainerDataScanner - .scanContainer(corruptData); + Optional> secondScan = onDemandScanner.scanContainer(corruptData); //Then the second scan is not scheduled and the first scan can still finish assertFalse(secondScan.isPresent()); latch.countDown(); @@ -161,19 +154,15 @@ public void testSameContainerQueuedMultipleTimes() throws Exception { @Test @Override public void testScannerMetrics() throws Exception { - OnDemandContainerDataScanner.init(conf, controller); ArrayList>> resultFutureList = Lists.newArrayList(); - resultFutureList.add(OnDemandContainerDataScanner.scanContainer( - corruptData)); - resultFutureList.add( - OnDemandContainerDataScanner.scanContainer(openContainer)); - resultFutureList.add( - OnDemandContainerDataScanner.scanContainer(openCorruptMetadata)); - resultFutureList.add(OnDemandContainerDataScanner.scanContainer(healthy)); + resultFutureList.add(onDemandScanner.scanContainer(corruptData)); + resultFutureList.add(onDemandScanner.scanContainer(openContainer)); + resultFutureList.add(onDemandScanner.scanContainer(openCorruptMetadata)); + resultFutureList.add(onDemandScanner.scanContainer(healthy)); // Deleted containers will not count towards the scan count metric. - resultFutureList.add(OnDemandContainerDataScanner.scanContainer(deletedContainer)); + resultFutureList.add(onDemandScanner.scanContainer(deletedContainer)); waitOnScannerToFinish(resultFutureList); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); //Containers with shouldScanData = false shouldn't increase // the number of scanned containers assertEquals(0, metrics.getNumUnHealthyContainers()); @@ -183,11 +172,9 @@ public void testScannerMetrics() throws Exception { @Test @Override public void testScannerMetricsUnregisters() { - OnDemandContainerDataScanner.init(conf, controller); - String metricsName = OnDemandContainerDataScanner.getMetrics().getName(); + String metricsName = onDemandScanner.getMetrics().getName(); assertNotNull(DefaultMetricsSystem.instance().getSource(metricsName)); - OnDemandContainerDataScanner.shutdown(); - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.shutdown(); assertNull(DefaultMetricsSystem.instance().getSource(metricsName)); } @@ -196,7 +183,7 @@ public void testScannerMetricsUnregisters() { public void testUnhealthyContainersDetected() throws Exception { // Without initialization, // there shouldn't be interaction with containerController - OnDemandContainerDataScanner.scanContainer(corruptData); + onDemandScanner.scanContainer(corruptData); verifyNoInteractions(controller); scanContainer(healthy); @@ -223,8 +210,7 @@ public void testUnhealthyContainersDetected() throws Exception { public void testWithVolumeFailure() throws Exception { when(vol.isFailed()).thenReturn(true); - OnDemandContainerDataScanner.init(conf, controller); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); scanContainer(healthy); verifyContainerMarkedUnhealthy(healthy, never()); @@ -248,11 +234,10 @@ public void testShutdownDuringScan() throws Exception { }); // Start the blocking scan. - OnDemandContainerDataScanner.init(conf, controller); - OnDemandContainerDataScanner.scanContainer(healthy); + onDemandScanner.scanContainer(healthy); // Shut down the on demand scanner. This will interrupt the blocked scan // on the healthy container. - OnDemandContainerDataScanner.shutdown(); + onDemandScanner.shutdown(); // Interrupting the healthy container's scan should not mark it unhealthy. verifyContainerMarkedUnhealthy(healthy, never()); } @@ -271,7 +256,7 @@ public void testUnhealthyContainerRescanned() throws Exception { // First iteration should find the unhealthy container. scanContainer(unhealthy); verifyContainerMarkedUnhealthy(unhealthy, atMostOnce()); - OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics(); + OnDemandScannerMetrics metrics = onDemandScanner.getMetrics(); assertEquals(1, metrics.getNumContainersScanned()); assertEquals(1, metrics.getNumUnHealthyContainers()); @@ -296,9 +281,7 @@ public void testUnhealthyContainerRescanned() throws Exception { } private void scanContainer(Container container) throws Exception { - OnDemandContainerDataScanner.init(conf, controller); - Optional> scanFuture = - OnDemandContainerDataScanner.scanContainer(container); + Optional> scanFuture = onDemandScanner.scanContainer(container); if (scanFuture.isPresent()) { scanFuture.get().get(); }