diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 727e903c7722..9e27b1e2679c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -18,22 +18,17 @@ package org.apache.hadoop.hdds.utils; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * An abstract class for a background service in ozone. @@ -50,10 +45,9 @@ public abstract class BackgroundService { // Executor to launch child tasks private final ScheduledExecutorService exec; private final ThreadGroup threadGroup; - private final ThreadFactory threadFactory; private final String serviceName; private final long interval; - private final long serviceTimeout; + private final long serviceTimeoutInNanos; private final TimeUnit unit; private final PeriodicalTask service; @@ -62,11 +56,11 @@ public BackgroundService(String serviceName, long interval, this.interval = interval; this.unit = unit; this.serviceName = serviceName; - this.serviceTimeout = serviceTimeout; + this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit) + .toLong(TimeUnit.NANOSECONDS); threadGroup = new ThreadGroup(serviceName); - ThreadFactory tf = r -> new Thread(threadGroup, r); - threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(tf) + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setThreadFactory(r -> new Thread(threadGroup, r)) .setDaemon(true) .setNameFormat(serviceName + "#%d") .build(); @@ -83,17 +77,12 @@ public int getThreadCount() { return threadGroup.activeCount(); } - @VisibleForTesting - public void triggerBackgroundTaskForTesting() { - service.run(); - } - // start service public void start() { exec.scheduleWithFixedDelay(service, 0, interval, unit); } - public abstract BackgroundTaskQueue getTasks(); + public abstract BackgroundTaskQueue getTasks(); /** * Run one or more background tasks concurrently. @@ -105,7 +94,7 @@ public synchronized void run() { if (LOG.isDebugEnabled()) { LOG.debug("Running background service : {}", serviceName); } - BackgroundTaskQueue tasks = getTasks(); + BackgroundTaskQueue tasks = getTasks(); if (tasks.isEmpty()) { // No task found, or some problems to init tasks // return and retry in next interval. @@ -114,41 +103,27 @@ public synchronized void run() { if (LOG.isDebugEnabled()) { LOG.debug("Number of background tasks to execute : {}", tasks.size()); } - CompletionService taskCompletionService = - new ExecutorCompletionService<>(exec); - List> results = Lists.newArrayList(); while (tasks.size() > 0) { - BackgroundTask task = tasks.poll(); - Future result = - taskCompletionService.submit(task); - results.add(result); - } - - results.parallelStream().forEach(taskResultFuture -> { - try { - // Collect task results - BackgroundTaskResult result = serviceTimeout > 0 - ? taskResultFuture.get(serviceTimeout, unit) - : taskResultFuture.get(); - if (LOG.isDebugEnabled()) { - LOG.debug("task execution result size {}", result.getSize()); + BackgroundTask task = tasks.poll(); + CompletableFuture.runAsync(() -> { + long startTime = System.nanoTime(); + try { + BackgroundTaskResult result = task.call(); + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getSize()); + } + } catch (Exception e) { + LOG.warn("Background task execution failed", e); + } finally { + long endTime = System.nanoTime(); + if (endTime - startTime > serviceTimeoutInNanos) { + LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", + serviceName, endTime - startTime, serviceTimeoutInNanos); + } } - } catch (InterruptedException e) { - LOG.warn( - "Background task failed due to interruption, retrying in " + - "next interval", e); - // Re-interrupt the thread while catching InterruptedException - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - LOG.warn( - "Background task fails to execute, " - + "retrying in next interval", e); - } catch (TimeoutException e) { - LOG.warn("Background task executes timed out, " - + "retrying in next interval", e); - } - }); + }, exec); + } } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskQueue.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskQueue.java index 005d14b8e3cc..ce22278f5e6e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskQueue.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskQueue.java @@ -17,24 +17,25 @@ package org.apache.hadoop.hdds.utils; +import java.util.Comparator; import java.util.PriorityQueue; /** * A priority queue that stores a number of {@link BackgroundTask}. */ -public class BackgroundTaskQueue { +public class BackgroundTaskQueue { - private final PriorityQueue tasks; + private final PriorityQueue> tasks; public BackgroundTaskQueue() { - tasks = new PriorityQueue<>((task1, task2) - -> task1.getPriority() - task2.getPriority()); + tasks = new PriorityQueue<>( + Comparator.comparingInt(BackgroundTask::getPriority)); } /** * @return the head task in this queue. */ - public synchronized BackgroundTask poll() { + public synchronized BackgroundTask poll() { return tasks.poll(); } @@ -44,7 +45,7 @@ public synchronized BackgroundTask poll() { * * @param task */ - public synchronized void add(BackgroundTask task) { + public synchronized void add(BackgroundTask task) { tasks.add(task); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 52daeffde240..b499a6da2511 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -321,8 +321,7 @@ public void testBlockDeletionTimeout() throws Exception { LogCapturer log = LogCapturer.captureLogs(BackgroundService.LOG); GenericTestUtils.waitFor(() -> { - if(log.getOutput().contains( - "Background task executes timed out, retrying in next interval")) { + if(log.getOutput().contains("Background task execution took")) { log.stopCapturing(); return true; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java index 2023e0e4cefa..daa8584b1c2c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -42,6 +43,23 @@ public final class OzoneTestUtils { private OzoneTestUtils() { } + /** + * Triggers Close container event for containers which contain the blocks + * listed in omKeyLocationInfoGroups. + * + * @param omKeyLocationInfoGroups locationInfos for a key. + * @param scm StorageContainerManager instance. + * @throws Exception + */ + public static void triggerCloseContainerEvent( + List omKeyLocationInfoGroups, + StorageContainerManager scm) throws Exception { + performOperationOnKeyContainers((blockID) -> scm.getEventQueue() + .fireEvent(SCMEvents.CLOSE_CONTAINER, + ContainerID.valueof(blockID.getContainerID())), + omKeyLocationInfoGroups); + } + /** * Close containers which contain the blocks listed in * omKeyLocationInfoGroups. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 97a27c1be6eb..4632b0190a03 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import com.google.common.primitives.Longs; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -29,9 +30,14 @@ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; +import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneTestUtils; @@ -46,17 +52,16 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.slf4j.event.Level; import java.io.File; @@ -68,22 +73,22 @@ import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static java.lang.Math.max; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds .HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import static org.apache.hadoop.ozone .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; /** * Tests for Block deletion. */ -@Ignore public class TestBlockDeletion { + public static final Logger LOG = + LoggerFactory.getLogger(TestBlockDeletion.class); + private static OzoneConfiguration conf = null; private static ObjectStore store; private static MiniOzoneCluster cluster = null; @@ -91,6 +96,7 @@ public class TestBlockDeletion { private static OzoneManager om = null; private static Set containerIdsWithDeletedBlocks; private static long maxTransactionId = 0; + private static File baseDir; @BeforeClass public static void init() throws Exception { @@ -98,9 +104,8 @@ public static void init() throws Exception { GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG); - String path = - GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName()); - File baseDir = new File(path); + String path = GenericTestUtils.getRandomizedTempPath(); + baseDir = new File(path); baseDir.mkdirs(); conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, @@ -110,8 +115,12 @@ public static void init() throws Exception { conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, - 3, TimeUnit.SECONDS); + + conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY + + ".client.request.write.timeout", 30, TimeUnit.SECONDS); + conf.setTimeDuration(RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY + + ".client.request.watch.timeout", 30, TimeUnit.SECONDS); + conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) @@ -129,6 +138,7 @@ public static void cleanup() { if (cluster != null) { cluster.shutdown(); } + FileUtils.deleteQuietly(baseDir); } @Test @@ -146,7 +156,7 @@ public void testBlockDeletion() throws Exception { OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>()); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10; i++) { out.write(value.getBytes()); } out.close(); @@ -161,7 +171,16 @@ public void testBlockDeletion() throws Exception { om.lookupKey(keyArgs).getKeyLocationVersions(); // verify key blocks were created in DN. - verifyBlocksCreated(omKeyLocationInfoGroupList); + GenericTestUtils.waitFor(() -> { + try { + verifyBlocksCreated(omKeyLocationInfoGroupList); + return true; + } catch (Throwable t) { + LOG.warn("Verify blocks creation failed", t); + return false; + } + }, 1000, 10000); + // No containers with deleted blocks Assert.assertTrue(containerIdsWithDeletedBlocks.isEmpty()); // Delete transactionIds for the containers should be 0. @@ -180,22 +199,29 @@ public void testBlockDeletion() throws Exception { } // close the containers which hold the blocks for the key - OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm); - - waitForDatanodeCommandRetry(); + OzoneTestUtils.triggerCloseContainerEvent(omKeyLocationInfoGroupList, scm); // make sure the containers are closed on the dn - omKeyLocationInfoGroupList.forEach((group) -> { - List locationInfo = group.getLocationList(); - locationInfo.forEach( - (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet() - .getContainer(info.getContainerID()).getContainerData() - .setState(ContainerProtos.ContainerDataProto.State.CLOSED)); - }); - waitForDatanodeBlockDeletionStart(); + GenericTestUtils.waitFor(() -> { + try { + verifyClosedContainersInDatanode(omKeyLocationInfoGroupList); + return true; + } catch (Throwable t) { + LOG.warn("Container closing failed", t); + return false; + } + }, 500, 30000); + // The blocks should be deleted in the DN. - verifyBlocksDeleted(omKeyLocationInfoGroupList); + GenericTestUtils.waitFor(() -> { + try { + verifyBlocksDeleted(omKeyLocationInfoGroupList); + return true; + } catch (Throwable t) { + LOG.warn("Verify blocks deletion failed", t); + return false; + } + }, 2000, 30000); // Few containers with deleted blocks Assert.assertTrue(!containerIdsWithDeletedBlocks.isEmpty()); @@ -204,53 +230,33 @@ public void testBlockDeletion() throws Exception { // Containers in the DN and SCM should have same delete transactionIds // after DN restart. The assertion is just to verify that the state of // containerInfos in dn and scm is consistent after dn restart. - cluster.restartHddsDatanode(0, true); + cluster.restartHddsDatanode(0, false); matchContainerTransactionIds(); // verify PENDING_DELETE_STATUS event is fired - verifyPendingDeleteEvent(); - - // Verify transactions committed - verifyTransactionsCommitted(); - } - - private void waitForDatanodeBlockDeletionStart() - throws TimeoutException, InterruptedException { - LogCapturer logCapturer = - LogCapturer.captureLogs(DeleteBlocksCommandHandler.LOG); - logCapturer.clearOutput(); - GenericTestUtils.waitFor(() -> logCapturer.getOutput() - .contains("Start to delete container block"), - 500, 10000); - Thread.sleep(1000); - } + //TODO: Reenable verification for Pending Deletion Status Event + //verifyPendingDeleteEvent(); - /** - * Waits for datanode command to be retried when datanode is dead. - */ - private void waitForDatanodeCommandRetry() - throws TimeoutException, InterruptedException { - cluster.shutdownHddsDatanode(0); - LogCapturer logCapturer = - LogCapturer.captureLogs(RetriableDatanodeEventWatcher.LOG); - logCapturer.clearOutput(); - GenericTestUtils.waitFor(() -> logCapturer.getOutput() - .contains("RetriableDatanodeCommand type=deleteBlocksCommand"), - 500, 5000); - cluster.restartHddsDatanode(0, true); + // make sure the transactions are committed in SCM + GenericTestUtils.waitFor(() -> { + try { + verifyTransactionsCommitted(); + return true; + } catch (Throwable t) { + LOG.warn("Container closing failed", t); + return false; + } + }, 500, 10000); } private void verifyTransactionsCommitted() throws IOException { - DeletedBlockLogImpl deletedBlockLog = - (DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog(); for (long txnID = 1; txnID <= maxTransactionId; txnID++) { Assert.assertNull( scm.getScmMetadataStore().getDeletedBlocksTXTable().get(txnID)); } } - private void verifyPendingDeleteEvent() - throws IOException, InterruptedException { + private void verifyPendingDeleteEvent() throws IOException { ContainerSet dnContainerSet = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet(); @@ -269,13 +275,15 @@ private void verifyPendingDeleteEvent() } ContainerReportsProto dummyReport = dummyReportsBuilder.build(); - logCapturer.clearOutput(); - cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContext().addReport(dummyReport); - cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().triggerHeartbeat(); - // wait for event to be handled by event handler - Thread.sleep(1000); + ContainerReportHandler containerReportHandler = + new ContainerReportHandler(scm.getScmNodeManager(), + scm.getContainerManager(), conf); + containerReportHandler.onMessage( + new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode( + cluster.getHddsDatanodes().get(0).getDatanodeDetails(), + dummyReport), scm.getEventQueue()); + ((EventQueue)scm.getEventQueue()).processAll(10000); + String output = logCapturer.getOutput(); for (ContainerReplicaProto containerInfo : dummyReport.getReportsList()) { long containerId = containerInfo.getContainerID(); @@ -292,61 +300,78 @@ private void verifyPendingDeleteEvent() } private void matchContainerTransactionIds() throws IOException { - ContainerSet dnContainerSet = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet(); - List containerDataList = new ArrayList<>(); - dnContainerSet.listContainer(0, 10000, containerDataList); - for (ContainerData containerData : containerDataList) { - long containerId = containerData.getContainerID(); - if (containerIdsWithDeletedBlocks.contains(containerId)) { - Assert.assertTrue( - scm.getContainerInfo(containerId).getDeleteTransactionId() > 0); - maxTransactionId = max(maxTransactionId, - scm.getContainerInfo(containerId).getDeleteTransactionId()); - } else { + for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) { + ContainerSet dnContainerSet = + datanode.getDatanodeStateMachine().getContainer().getContainerSet(); + List containerDataList = new ArrayList<>(); + dnContainerSet.listContainer(0, 10000, containerDataList); + for (ContainerData containerData : containerDataList) { + long containerId = containerData.getContainerID(); + if (containerIdsWithDeletedBlocks.contains(containerId)) { + Assert.assertTrue( + scm.getContainerInfo(containerId).getDeleteTransactionId() > 0); + maxTransactionId = max(maxTransactionId, + scm.getContainerInfo(containerId).getDeleteTransactionId()); + } else { + Assert.assertEquals( + scm.getContainerInfo(containerId).getDeleteTransactionId(), 0); + } Assert.assertEquals( - scm.getContainerInfo(containerId).getDeleteTransactionId(), 0); + ((KeyValueContainerData) dnContainerSet.getContainer(containerId) + .getContainerData()).getDeleteTransactionId(), + scm.getContainerInfo(containerId).getDeleteTransactionId()); } - Assert.assertEquals(((KeyValueContainerData)dnContainerSet - .getContainer(containerId).getContainerData()) - .getDeleteTransactionId(), - scm.getContainerInfo(containerId).getDeleteTransactionId()); } } private void verifyBlocksCreated( List omKeyLocationInfoGroups) throws Exception { - ContainerSet dnContainerSet = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet(); - OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { - try(ReferenceCountedDB db = - BlockUtils.getDB((KeyValueContainerData) dnContainerSet - .getContainer(blockID.getContainerID()).getContainerData(), conf)) { - Assert.assertNotNull(db.getStore().get( - Longs.toByteArray(blockID.getLocalID()))); - } - }, omKeyLocationInfoGroups); + for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) { + ContainerSet dnContainerSet = + datanode.getDatanodeStateMachine().getContainer().getContainerSet(); + OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { + try (ReferenceCountedDB db = BlockUtils.getDB( + (KeyValueContainerData) dnContainerSet + .getContainer(blockID.getContainerID()).getContainerData(), + conf)) { + Assert.assertNotNull( + db.getStore().get(Longs.toByteArray(blockID.getLocalID()))); + } + }, omKeyLocationInfoGroups); + } } private void verifyBlocksDeleted( List omKeyLocationInfoGroups) throws Exception { - ContainerSet dnContainerSet = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() - .getContainer().getContainerSet(); - OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { - try(ReferenceCountedDB db = - BlockUtils.getDB((KeyValueContainerData) dnContainerSet - .getContainer(blockID.getContainerID()).getContainerData(), conf)) { - Assert.assertNull(db.getStore().get( - Longs.toByteArray(blockID.getLocalID()))); - Assert.assertNull(db.getStore().get(StringUtils.string2Bytes( - OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()))); - Assert.assertNotNull(StringUtils.string2Bytes( - OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID())); - } - containerIdsWithDeletedBlocks.add(blockID.getContainerID()); - }, omKeyLocationInfoGroups); + for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) { + ContainerSet dnContainerSet = datanode.getDatanodeStateMachine() + .getContainer().getContainerSet(); + OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { + try (ReferenceCountedDB db = BlockUtils.getDB( + (KeyValueContainerData) dnContainerSet + .getContainer(blockID.getContainerID()).getContainerData(), + conf)) { + Assert.assertNull( + db.getStore().get(Longs.toByteArray(blockID.getLocalID()))); + Assert.assertNull(db.getStore().get(StringUtils.string2Bytes( + OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()))); + Assert.assertNotNull(StringUtils.string2Bytes( + OzoneConsts.DELETED_KEY_PREFIX + blockID.getLocalID())); + } + containerIdsWithDeletedBlocks.add(blockID.getContainerID()); + }, omKeyLocationInfoGroups); + } + } + + private void verifyClosedContainersInDatanode( + List omKeyLocationInfoGroup) throws Exception { + for (HddsDatanodeService datanode : cluster.getHddsDatanodes()) { + ContainerSet dnContainerSet = datanode.getDatanodeStateMachine() + .getContainer().getContainerSet(); + OzoneTestUtils.performOperationOnKeyContainers(blockID -> Assert + .assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + dnContainerSet.getContainer(blockID.getContainerID()) + .getContainerData().getState()), omKeyLocationInfoGroup); + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java index 1f483345eb26..26b4b775e833 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java @@ -66,9 +66,6 @@ public class KeyDeletingService extends BackgroundService { private static final Logger LOG = LoggerFactory.getLogger(KeyDeletingService.class); - // The thread pool size for key deleting service. - private final static int KEY_DELETING_CORE_POOL_SIZE = 2; - private final OzoneManager ozoneManager; private final ScmBlockLocationProtocol scmClient; private final KeyManager manager; @@ -81,8 +78,8 @@ public class KeyDeletingService extends BackgroundService { ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, long serviceTimeout, ConfigurationSource conf) { - super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, - KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, 1, + serviceTimeout); this.ozoneManager = ozoneManager; this.scmClient = scmClient; this.manager = manager; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 79bc39f49846..477afb3f9a71 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -45,16 +45,14 @@ public class OpenKeyCleanupService extends BackgroundService { private static final Logger LOG = LoggerFactory.getLogger(OpenKeyCleanupService.class); - private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2; - private final KeyManager keyManager; private final ScmBlockLocationProtocol scmClient; public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient, KeyManager keyManager, int serviceInterval, long serviceTimeout) { - super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, - OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, 1, + serviceTimeout); this.keyManager = keyManager; this.scmClient = scmClient; }