From 5b19ae0b3dd8e00ea7e67c9aa92bf0733a487f46 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 26 Nov 2024 18:43:24 +0530 Subject: [PATCH 1/8] HDDS-11808. Key deletion service should support multiple threads --- .../apache/hadoop/ozone/om/OMConfigKeys.java | 5 + .../hadoop/ozone/om/TestKeyPurging.java | 2 +- ...napshotDeletingServiceIntegrationTest.java | 4 +- .../apache/hadoop/ozone/om/KeyManager.java | 5 +- .../hadoop/ozone/om/KeyManagerImpl.java | 19 +- .../ozone/om/OmMetadataManagerImpl.java | 230 +++++++++--------- .../service/AbstractKeyDeletingService.java | 9 +- .../ozone/om/service/KeyDeletingService.java | 72 +++++- .../om/service/TestKeyDeletingService.java | 25 +- 9 files changed, 223 insertions(+), 148 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index e274d822b63e..e80fdadb8b95 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -422,6 +422,11 @@ private OMConfigKeys() { public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10; + public static final String OZONE_THREAD_NUMBER_KEY_DELETION = + "ozone.thread.number.key.deletion"; + + public static final int OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT = 10; + public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK = "ozone.snapshot.filtering.limit.per.task"; public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java index 23d0cdd1b16f..8be4de34e665 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java @@ -138,7 +138,7 @@ public void testKeysPurgingByKeyDeletingService() throws Exception { GenericTestUtils.waitFor( () -> { try { - return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, keyDeletingService.getDeletedKeySupplier()) .getKeyBlocksList().size() == 0; } catch (IOException e) { return false; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java index 9bafe148aeea..84554d58054b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java @@ -508,11 +508,11 @@ private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean keyDeletion when(ozoneManager.getKeyManager()).thenReturn(keyManager); KeyDeletingService keyDeletingService = Mockito.spy(new KeyDeletingService(ozoneManager, ozoneManager.getScmClient().getBlockClient(), keyManager, 10000, - 100000, cluster.getConf(), false)); + 100000, cluster.getConf(), false, 1)); keyDeletingService.shutdown(); GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 1000, 100000); - when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> { + when(keyManager.getPendingDeletionKeys(anyInt(), keyDeletingService.getDeletedKeySupplier())).thenAnswer(i -> { // wait for SDS to reach the KDS wait block before processing any key. GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000); keyDeletionStarted.set(true); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 9f6d8b81c10a..3ef5e5215a39 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -116,11 +116,14 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey, * Second is a Mapping of Key-Value pair which is updated in the deletedTable. * * @param count max number of keys to return. + * @param deletedKeySupplier DeletedKeySupplier object. * @return a Pair of list of {@link BlockGroup} representing keys and blocks, * and a hashmap for key-value pair to be updated in the deletedTable. * @throws IOException */ - PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException; + PendingKeysDeletion getPendingDeletionKeys(int count, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) + throws IOException; /** * Returns a list rename entries from the snapshotRenamedTable. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 8e3bbb47c3c4..8bccc3d6f1f2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -155,6 +155,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; @@ -243,9 +245,13 @@ public void start(OzoneConfiguration configuration) { OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - keyDeletingService = new KeyDeletingService(ozoneManager, - scmClient.getBlockClient(), this, blockDeleteInterval, - serviceTimeout, configuration, isSnapshotDeepCleaningEnabled); + int keyDeletingCorePoolSize = + configuration.getInt(OZONE_THREAD_NUMBER_KEY_DELETION, + OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT); + keyDeletingService = + new KeyDeletingService(ozoneManager, scmClient.getBlockClient(), this, + blockDeleteInterval, serviceTimeout, configuration, + isSnapshotDeepCleaningEnabled, keyDeletingCorePoolSize); keyDeletingService.start(); } @@ -684,12 +690,13 @@ public ListKeysResult listKeys(String volumeName, String bucketName, } @Override - public PendingKeysDeletion getPendingDeletionKeys(final int count) + public PendingKeysDeletion getPendingDeletionKeys(final int count, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) throws IOException { OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) metadataManager; - return omMetadataManager - .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager()); + return omMetadataManager.getPendingDeletionKeys(count, + ozoneManager.getOmSnapshotManager(), deletedKeySupplier); } private List> getTableEntries(String startKey, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 8f4c070b76c6..a2b73e99dd5d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -87,6 +87,7 @@ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; +import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo; @@ -1587,132 +1588,131 @@ private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) * @throws IOException */ public PendingKeysDeletion getPendingDeletionKeys(final int keyCount, - OmSnapshotManager omSnapshotManager) + OmSnapshotManager omSnapshotManager, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) throws IOException { List keyBlocksList = Lists.newArrayList(); HashMap keysToModify = new HashMap<>(); - try (TableIterator> - keyIter = getDeletedTable().iterator()) { - int currentCount = 0; - while (keyIter.hasNext() && currentCount < keyCount) { - RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); - KeyValue kv = keyIter.next(); - if (kv != null) { - List blockGroupList = Lists.newArrayList(); - // Get volume name and bucket name - String[] keySplit = kv.getKey().split(OM_KEY_PREFIX); - String bucketKey = getBucketKey(keySplit[1], keySplit[2]); - OmBucketInfo bucketInfo = getBucketTable().get(bucketKey); - // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. - SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null : - SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), - bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); - // previous snapshot is not active or it has not been flushed to disk then don't process the key in this - // iteration. - if (previousSnapshotInfo != null && - (previousSnapshotInfo.getSnapshotStatus() != SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE || - !OmSnapshotManager.areSnapshotChangesFlushedToDB(ozoneManager.getMetadataManager(), - previousSnapshotInfo))) { - continue; - } - // Get the latest snapshot in snapshot path. - try (ReferenceCounted rcLatestSnapshot = previousSnapshotInfo == null ? null : - omSnapshotManager.getSnapshot(previousSnapshotInfo.getVolumeName(), - previousSnapshotInfo.getBucketName(), previousSnapshotInfo.getName())) { - - // Multiple keys with the same path can be queued in one DB entry - RepeatedOmKeyInfo infoList = kv.getValue(); - for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) { - // Skip the key if it exists in the previous snapshot (of the same - // scope) as in this case its blocks should not be reclaimed - - // If the last snapshot is deleted and the keys renamed in between - // the snapshots will be cleaned up by KDS. So we need to check - // in the renamedTable as well. - String dbRenameKey = getRenameKey(info.getVolumeName(), - info.getBucketName(), info.getObjectID()); - - if (rcLatestSnapshot != null) { - Table prevKeyTable = - rcLatestSnapshot.get() - .getMetadataManager() - .getKeyTable(bucketInfo.getBucketLayout()); - - Table prevDeletedTable = - rcLatestSnapshot.get().getMetadataManager().getDeletedTable(); - String prevKeyTableDBKey = getSnapshotRenamedTable() - .get(dbRenameKey); - String prevDelTableDBKey = getOzoneKey(info.getVolumeName(), - info.getBucketName(), info.getKeyName()); - // format: /volName/bucketName/keyName/objId - prevDelTableDBKey = getOzoneDeletePathKey(info.getObjectID(), - prevDelTableDBKey); - - if (prevKeyTableDBKey == null && - bucketInfo.getBucketLayout().isFileSystemOptimized()) { - long volumeId = getVolumeId(info.getVolumeName()); - prevKeyTableDBKey = getOzonePathKey(volumeId, - bucketInfo.getObjectID(), - info.getParentObjectID(), - info.getFileName()); - } else if (prevKeyTableDBKey == null) { - prevKeyTableDBKey = getOzoneKey(info.getVolumeName(), - info.getBucketName(), + int currentCount = 0; + while (currentCount < keyCount) { + RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); + KeyValue kv = deletedKeySupplier.get(); + if (kv != null) { + List blockGroupList = Lists.newArrayList(); + // Get volume name and bucket name + String[] keySplit = kv.getKey().split(OM_KEY_PREFIX); + String bucketKey = getBucketKey(keySplit[1], keySplit[2]); + OmBucketInfo bucketInfo = getBucketTable().get(bucketKey); + // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. + SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null : + SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), + bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); + // previous snapshot is not active or it has not been flushed to disk then don't process the key in this + // iteration. + if (previousSnapshotInfo != null && (previousSnapshotInfo.getSnapshotStatus() != + SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE || !OmSnapshotManager.areSnapshotChangesFlushedToDB( + ozoneManager.getMetadataManager(), previousSnapshotInfo))) { + continue; + } + // Get the latest snapshot in snapshot path. + try (ReferenceCounted rcLatestSnapshot = + previousSnapshotInfo == null ? null : omSnapshotManager.getSnapshot( + previousSnapshotInfo.getVolumeName(), + previousSnapshotInfo.getBucketName(), + previousSnapshotInfo.getName())) { + + // Multiple keys with the same path can be queued in one DB entry + RepeatedOmKeyInfo infoList = kv.getValue(); + for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) { + // Skip the key if it exists in the previous snapshot (of the same + // scope) as in this case its blocks should not be reclaimed + + // If the last snapshot is deleted and the keys renamed in between + // the snapshots will be cleaned up by KDS. So we need to check + // in the renamedTable as well. + String dbRenameKey = + getRenameKey(info.getVolumeName(), info.getBucketName(), + info.getObjectID()); + + if (rcLatestSnapshot != null) { + Table prevKeyTable = + rcLatestSnapshot.get().getMetadataManager() + .getKeyTable(bucketInfo.getBucketLayout()); + + Table prevDeletedTable = + rcLatestSnapshot.get().getMetadataManager().getDeletedTable(); + String prevKeyTableDBKey = + getSnapshotRenamedTable().get(dbRenameKey); + String prevDelTableDBKey = + getOzoneKey(info.getVolumeName(), info.getBucketName(), info.getKeyName()); - } + // format: /volName/bucketName/keyName/objId + prevDelTableDBKey = + getOzoneDeletePathKey(info.getObjectID(), prevDelTableDBKey); + + if (prevKeyTableDBKey == null && bucketInfo.getBucketLayout() + .isFileSystemOptimized()) { + long volumeId = getVolumeId(info.getVolumeName()); + prevKeyTableDBKey = + getOzonePathKey(volumeId, bucketInfo.getObjectID(), + info.getParentObjectID(), info.getFileName()); + } else if (prevKeyTableDBKey == null) { + prevKeyTableDBKey = + getOzoneKey(info.getVolumeName(), info.getBucketName(), + info.getKeyName()); + } - OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey); - // When key is deleted it is no longer in keyTable, we also - // have to check deletedTable of previous snapshot - RepeatedOmKeyInfo delOmKeyInfo = - prevDeletedTable.get(prevDelTableDBKey); - if (versionExistsInPreviousSnapshot(omKeyInfo, - info, delOmKeyInfo)) { - // If the infoList size is 1, there is nothing to split. - // We either delete it or skip it. - if (!(infoList.getOmKeyInfoList().size() == 1)) { - notReclaimableKeyInfo.addOmKeyInfo(info); - } - continue; + OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey); + // When key is deleted it is no longer in keyTable, we also + // have to check deletedTable of previous snapshot + RepeatedOmKeyInfo delOmKeyInfo = + prevDeletedTable.get(prevDelTableDBKey); + if (versionExistsInPreviousSnapshot(omKeyInfo, info, + delOmKeyInfo)) { + // If the infoList size is 1, there is nothing to split. + // We either delete it or skip it. + if (!(infoList.getOmKeyInfoList().size() == 1)) { + notReclaimableKeyInfo.addOmKeyInfo(info); } + continue; } + } - // Add all blocks from all versions of the key to the deletion - // list - for (OmKeyLocationInfoGroup keyLocations : - info.getKeyLocationVersions()) { - List item = keyLocations.getLocationList().stream() - .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(kv.getKey()) - .addAllBlockIDs(item) - .build(); - blockGroupList.add(keyBlocks); - } - currentCount++; + // Add all blocks from all versions of the key to the deletion + // list + for (OmKeyLocationInfoGroup keyLocations : info.getKeyLocationVersions()) { + List item = keyLocations.getLocationList().stream() + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = + BlockGroup.newBuilder().setKeyName(kv.getKey()) + .addAllBlockIDs(item).build(); + blockGroupList.add(keyBlocks); } + currentCount++; + } - List notReclaimableKeyInfoList = - notReclaimableKeyInfo.getOmKeyInfoList(); - // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. - SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null : - SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), - bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); - // Check if the previous snapshot in the chain hasn't changed. - if (Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId), - Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) { - // If all the versions are not reclaimable, then do nothing. - if (notReclaimableKeyInfoList.size() > 0 && - notReclaimableKeyInfoList.size() != - infoList.getOmKeyInfoList().size()) { - keysToModify.put(kv.getKey(), notReclaimableKeyInfo); - } + List notReclaimableKeyInfoList = + notReclaimableKeyInfo.getOmKeyInfoList(); + // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. + SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null : + SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), + bucketInfo.getBucketName(), ozoneManager, + snapshotChainManager); + // Check if the previous snapshot in the chain hasn't changed. + if (Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo) + .map(SnapshotInfo::getSnapshotId), + Optional.ofNullable(previousSnapshotInfo) + .map(SnapshotInfo::getSnapshotId))) { + // If all the versions are not reclaimable, then do nothing. + if (notReclaimableKeyInfoList.size() > 0 && notReclaimableKeyInfoList.size() != infoList.getOmKeyInfoList() + .size()) { + keysToModify.put(kv.getKey(), notReclaimableKeyInfo); + } - if (notReclaimableKeyInfoList.size() != - infoList.getOmKeyInfoList().size()) { - keyBlocksList.addAll(blockGroupList); - } + if (notReclaimableKeyInfoList.size() != infoList.getOmKeyInfoList() + .size()) { + keyBlocksList.addAll(blockGroupList); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index a3d7ccb66187..d3c8dad7b1e7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -100,7 +100,7 @@ public AbstractKeyDeletingService(String serviceName, long interval, protected int processKeyDeletes(List keyBlocksList, KeyManager manager, HashMap keysToModify, - String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { + String snapTableKey, UUID expectedPreviousSnapshotId, long run) throws IOException { long startTime = Time.monotonicNow(); int delCount = 0; @@ -123,7 +123,7 @@ protected int processKeyDeletes(List keyBlocksList, startTime = Time.monotonicNow(); if (isRatisEnabled()) { delCount = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, snapTableKey, expectedPreviousSnapshotId); + keysToModify, snapTableKey, expectedPreviousSnapshotId, run); } else { // TODO: Once HA and non-HA paths are merged, we should have // only one code path here. Purge keys should go through an @@ -177,7 +177,8 @@ private int deleteAllKeys(List results, * @param keysToModify Updated list of RepeatedOmKeyInfo */ private int submitPurgeKeysRequest(List results, - HashMap keysToModify, String snapTableKey, UUID expectedPreviousSnapshotId) { + HashMap keysToModify, String snapTableKey, + UUID expectedPreviousSnapshotId, long run) { Map, List> purgeKeysMapPerBucket = new HashMap<>(); @@ -256,7 +257,7 @@ private int submitPurgeKeysRequest(List results, // Submit PurgeKeys request to OM try { - OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, run); } catch (ServiceException e) { LOG.error("PurgeKey request failed. Will retry at next run.", e); return 0; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 797b2b9bf9f0..5b659b2ad746 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; @@ -101,13 +102,22 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final SnapshotChainManager snapshotChainManager; private DeletingServiceMetrics metrics; + public DeletedKeySupplier getDeletedKeySupplier() { + return deletedKeySupplier; + } + + private final DeletedKeySupplier deletedKeySupplier; + private AtomicInteger taskCount = new AtomicInteger(0); + private final int keyDeletingCorePoolSize; + + @SuppressWarnings("parameternumber") public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, long serviceTimeout, ConfigurationSource conf, - boolean deepCleanSnapshots) { + boolean deepCleanSnapshots, int keyDeletingCorePoolSize) { super(KeyDeletingService.class.getSimpleName(), serviceInterval, - TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE, + TimeUnit.MILLISECONDS, keyDeletingCorePoolSize, serviceTimeout, ozoneManager, scmClient); this.manager = manager; this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, @@ -124,6 +134,9 @@ public KeyDeletingService(OzoneManager ozoneManager, this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)manager.getMetadataManager()).getSnapshotChainManager(); this.metrics = ozoneManager.getDeletionMetrics(); + deletedKeySupplier = new DeletedKeySupplier(); + taskCount.set(0); + this.keyDeletingCorePoolSize = keyDeletingCorePoolSize; } /** @@ -143,10 +156,51 @@ public boolean isRunningOnAOS() { @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new KeyDeletingTask(this)); + if (taskCount.get() > 0) { + LOG.info("{} Key deleting task(s) already in progress.", + taskCount.get()); + return queue; + } + try { + deletedKeySupplier.reInitItr(); + } catch (IOException ex) { + LOG.error("Unable to get the iterator.", ex); + } + taskCount.set(keyDeletingCorePoolSize); + for (int i = 0; i < keyDeletingCorePoolSize; i++) { + queue.add(new KeyDeletingTask(this)); + } return queue; } + /** + * DeletedKeySupplier class. + */ + public final class DeletedKeySupplier { + private TableIterator> + deleteKeyTableIterator; + + public synchronized Table.KeyValue get() + throws IOException { + if (deleteKeyTableIterator.hasNext()) { + return deleteKeyTableIterator.next(); + } + return null; + } + + private synchronized void closeItr() { + IOUtils.closeQuietly(deleteKeyTableIterator); + deleteKeyTableIterator = null; + } + + private synchronized void reInitItr() throws IOException { + closeItr(); + deleteKeyTableIterator = + getOzoneManager().getMetadataManager().getDeletedTable().iterator(); + + } + } + private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing @@ -217,13 +271,14 @@ public BackgroundTaskResult call() { // snapshotId since AOS could process multiple buckets in one iteration. UUID expectedPreviousSnapshotId = snapshotChainManager.getLatestGlobalSnapshotId(); PendingKeysDeletion pendingKeysDeletion = manager - .getPendingDeletionKeys(getKeyLimitPerTask()); + .getPendingDeletionKeys(getKeyLimitPerTask(), deletedKeySupplier); List keyBlocksList = pendingKeysDeletion .getKeyBlocksList(); if (keyBlocksList != null && !keyBlocksList.isEmpty()) { delCount = processKeyDeletes(keyBlocksList, getOzoneManager().getKeyManager(), - pendingKeysDeletion.getKeysToModify(), null, expectedPreviousSnapshotId); + pendingKeysDeletion.getKeysToModify(), null, + expectedPreviousSnapshotId, run); deletedKeyCount.addAndGet(delCount); metrics.incrNumKeysProcessed(keyBlocksList.size()); metrics.incrNumKeysSentForPurge(delCount); @@ -235,7 +290,7 @@ public BackgroundTaskResult call() { try { if (deepCleanSnapshots && delCount < keyLimitPerTask) { - processSnapshotDeepClean(delCount); + processSnapshotDeepClean(delCount, run); } } catch (Exception e) { LOG.error("Error while running deep clean on snapshots. Will " + @@ -253,7 +308,7 @@ public BackgroundTaskResult call() { } @SuppressWarnings("checkstyle:MethodLength") - private void processSnapshotDeepClean(int delCount) + private void processSnapshotDeepClean(int delCount, long run) throws IOException { OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); @@ -451,7 +506,8 @@ private void processSnapshotDeepClean(int delCount) if (!keysToPurge.isEmpty()) { processKeyDeletes(keysToPurge, currOmSnapshot.getKeyManager(), keysToModify, currSnapInfo.getTableKey(), - Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null)); + Optional.ofNullable(previousSnapshot) + .map(SnapshotInfo::getSnapshotId).orElse(null), run); } } finally { IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index ff6506da0347..fd70a102f844 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -203,8 +203,9 @@ void checkIfDeleteServiceIsDeletingKeys() () -> getDeletedKeyCount() >= initialDeletedCount + keyCount, 100, 10000); assertThat(getRunCount()).isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); } @Test @@ -232,8 +233,9 @@ void checkDeletionForKeysWithMultipleVersions() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); // The 1st version of the key has 1 block and the 2nd version has 2 // blocks. Hence, the ScmBlockClient should have received at least 3 @@ -274,8 +276,9 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); // deletedTable should have deleted key of the snapshot bucket assertFalse(metadataManager.getDeletedTable().isEmpty()); @@ -331,7 +334,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() return omSnapshotManager; }); KeyDeletingService service = new KeyDeletingService(ozoneManager, scmBlockTestingClient, km, 10000, - 100000, conf, false); + 100000, conf, false, 1); service.shutdown(); final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); @@ -381,7 +384,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() Assertions.assertNotEquals(deletePathKey[0], group.getGroupID()); } return pendingKeysDeletion; - }).when(km).getPendingDeletionKeys(anyInt()); + }).when(km).getPendingDeletionKeys(anyInt(), keyDeletingService.getDeletedKeySupplier()); service.runPeriodicalTaskNow(); service.runPeriodicalTaskNow(); assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); @@ -882,8 +885,8 @@ private long getRunCount() { private int countKeysPendingDeletion() { try { - final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) - .getKeyBlocksList().size(); + final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()).getKeyBlocksList().size(); LOG.debug("KeyManager keys pending deletion: {}", count); return count; } catch (IOException e) { @@ -893,7 +896,7 @@ private int countKeysPendingDeletion() { private long countBlocksPendingDeletion() { try { - return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, keyDeletingService.getDeletedKeySupplier()) .getKeyBlocksList() .stream() .map(BlockGroup::getBlockIDList) From e01fbbb880426fd22b67fdcdf60d72ee9dbff64f Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 3 Dec 2024 19:27:13 +0530 Subject: [PATCH 2/8] Made minor changes. --- .../apache/hadoop/ozone/om/OmMetadataManagerImpl.java | 2 ++ .../hadoop/ozone/om/service/KeyDeletingService.java | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index a2b73e99dd5d..41764f33a709 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -1716,6 +1716,8 @@ public PendingKeysDeletion getPendingDeletionKeys(final int keyCount, } } } + } else { + break; } } return new PendingKeysDeletion(keyBlocksList, keysToModify); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 5b659b2ad746..85bf4e3fa029 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -177,7 +177,7 @@ public BackgroundTaskQueue getTasks() { * DeletedKeySupplier class. */ public final class DeletedKeySupplier { - private TableIterator> + public TableIterator> deleteKeyTableIterator; public synchronized Table.KeyValue get() @@ -188,15 +188,15 @@ public synchronized Table.KeyValue get() return null; } - private synchronized void closeItr() { + public synchronized void closeItr() { IOUtils.closeQuietly(deleteKeyTableIterator); deleteKeyTableIterator = null; } - private synchronized void reInitItr() throws IOException { + public synchronized void reInitItr() throws IOException { closeItr(); deleteKeyTableIterator = - getOzoneManager().getMetadataManager().getDeletedTable().iterator(); + manager.getMetadataManager().getDeletedTable().iterator(); } } @@ -283,6 +283,7 @@ public BackgroundTaskResult call() { metrics.incrNumKeysProcessed(keyBlocksList.size()); metrics.incrNumKeysSentForPurge(delCount); } + taskCount.getAndDecrement(); } catch (IOException e) { LOG.error("Error while running delete keys background task. Will " + "retry at next run.", e); From bb6248e00472359644a8fc955ea5747dcbc6cfc1 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 3 Dec 2024 19:41:50 +0530 Subject: [PATCH 3/8] Closed Iterator. --- .../hadoop/ozone/om/service/KeyDeletingService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 85bf4e3fa029..9feba036db05 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -173,11 +173,17 @@ public BackgroundTaskQueue getTasks() { return queue; } + @Override + public void shutdown() { + super.shutdown(); + deletedKeySupplier.closeItr(); + } + /** * DeletedKeySupplier class. */ public final class DeletedKeySupplier { - public TableIterator> + private TableIterator> deleteKeyTableIterator; public synchronized Table.KeyValue get() From 3a7c3b01bac014e9cdd1545a4c63cd720f4d1b80 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Wed, 4 Dec 2024 04:00:04 +0530 Subject: [PATCH 4/8] Fixed TestSnapshotDirectoryCleaningService. --- .../org/apache/hadoop/ozone/TestOzoneConfigurationFields.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 3b650f1bf511..b3f5b0bd0e9d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -128,6 +128,7 @@ private void addPropertiesNotInXml() { OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD, OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION, + OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION, ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY, ScmConfigKeys.OZONE_SCM_HA_PREFIX, S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED, From de85f77b075cb13ecc44a18832c67ae7889243a6 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Wed, 4 Dec 2024 17:35:04 +0530 Subject: [PATCH 5/8] Made minor changes II. --- .../org/apache/hadoop/ozone/om/service/KeyDeletingService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 9feba036db05..c41873b48f87 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -289,7 +289,6 @@ public BackgroundTaskResult call() { metrics.incrNumKeysProcessed(keyBlocksList.size()); metrics.incrNumKeysSentForPurge(delCount); } - taskCount.getAndDecrement(); } catch (IOException e) { LOG.error("Error while running delete keys background task. Will " + "retry at next run.", e); @@ -305,6 +304,7 @@ public BackgroundTaskResult call() { } } + taskCount.getAndDecrement(); isRunningOnAOS.set(false); synchronized (deletingService) { this.deletingService.notify(); From 63da5153becae3c4522b0afc271b01c54b7dc14c Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Mon, 6 Jan 2025 13:27:54 +0530 Subject: [PATCH 6/8] Fixed snapshot test cases. --- ...napshotDeletingServiceIntegrationTest.java | 3 +- .../ozone/om/service/KeyDeletingService.java | 9 + .../om/service/TestKeyDeletingService.java | 233 +++++++++++------- 3 files changed, 157 insertions(+), 88 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java index 84554d58054b..73d4b5801d9a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java @@ -93,6 +93,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -512,7 +513,7 @@ private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean keyDeletion keyDeletingService.shutdown(); GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 1000, 100000); - when(keyManager.getPendingDeletionKeys(anyInt(), keyDeletingService.getDeletedKeySupplier())).thenAnswer(i -> { + when(keyManager.getPendingDeletionKeys(anyInt(), any())).thenAnswer(i -> { // wait for SDS to reach the KDS wait block before processing any key. GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000); keyDeletionStarted.set(true); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index c41873b48f87..6afd69c56f57 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -165,6 +165,7 @@ public BackgroundTaskQueue getTasks() { deletedKeySupplier.reInitItr(); } catch (IOException ex) { LOG.error("Unable to get the iterator.", ex); + return queue; } taskCount.set(keyDeletingCorePoolSize); for (int i = 0; i < keyDeletingCorePoolSize; i++) { @@ -183,6 +184,14 @@ public void shutdown() { * DeletedKeySupplier class. */ public final class DeletedKeySupplier { + + public DeletedKeySupplier() { + try { + reInitItr(); + } catch (IOException ex) { + + } + } private TableIterator> deleteKeyTableIterator; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index fd70a102f844..84a03aa578f3 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -90,11 +90,13 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -147,6 +149,23 @@ private void createConfig(File testDir) { conf.setQuietMode(false); } + private void createConfigForSnapshot(File testDir) { + conf = new OzoneConfiguration(); + System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); + ServerUtils.setOzoneMetaDirPath(conf, testDir.toString()); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, + 1, TimeUnit.SECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, + 200, TimeUnit.MILLISECONDS); + conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true); + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 1); + conf.setQuietMode(false); + } + private void createSubject() throws Exception { OmTestManagers omTestManagers = new OmTestManagers(conf, scmBlockTestingClient, null); keyManager = omTestManagers.getKeyManager(); @@ -384,7 +403,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() Assertions.assertNotEquals(deletePathKey[0], group.getGroupID()); } return pendingKeysDeletion; - }).when(km).getPendingDeletionKeys(anyInt(), keyDeletingService.getDeletedKeySupplier()); + }).when(km).getPendingDeletionKeys(anyInt(), any()); service.runPeriodicalTaskNow(); service.runPeriodicalTaskNow(); assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); @@ -409,95 +428,30 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() keyDeletingService.resume(); } - /* - * Create Snap1 - * Create 10 keys - * Create Snap2 - * Delete 10 keys - * Create 5 keys - * Delete 5 keys -> but stop KeyDeletingService so - that keys won't be reclaimed. - * Create snap3 - * Now wait for snap3 to be deepCleaned -> Deleted 5 - keys should be deep cleaned. - * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable - of Snap3 should be empty. - */ - @Test - void testSnapshotDeepClean() throws Exception { - Table snapshotInfoTable = - om.getMetadataManager().getSnapshotInfoTable(); - Table deletedTable = - om.getMetadataManager().getDeletedTable(); - Table keyTable = - om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT); - - // Suspend KeyDeletingService - keyDeletingService.suspend(); - - final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); - final long initialKeyCount = metadataManager.countRowsInTable(keyTable); - final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); - final String volumeName = getTestName(); - final String bucketName = uniqueObjectName("bucket"); - - // Create Volume and Buckets - createVolumeAndBucket(volumeName, bucketName, false); - - writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, metadataManager); - - List createdKeys = new ArrayList<>(); - for (int i = 1; i <= 10; i++) { - OmKeyArgs args = createAndCommitKey(volumeName, bucketName, - uniqueObjectName("key"), 3); - createdKeys.add(args); - } - assertTableRowCount(keyTable, initialKeyCount + 10, metadataManager); - - String snap2 = uniqueObjectName("snap"); - writeClient.createSnapshot(volumeName, bucketName, snap2); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); - - // Create 5 Keys - for (int i = 11; i <= 15; i++) { - OmKeyArgs args = createAndCommitKey(volumeName, bucketName, - uniqueObjectName("key"), 3); - createdKeys.add(args); - } - - // Delete all 15 keys. - for (int i = 0; i < 15; i++) { - writeClient.deleteKey(createdKeys.get(i)); - } + } - assertTableRowCount(deletedTable, initialDeletedCount + 15, metadataManager); + @Nested + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class Snapshot { + @BeforeAll + void setup(@TempDir File testDir) throws Exception { + // failCallsFrequency = 0 means all calls succeed + scmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 0); - // Create Snap3, traps all the deleted keys. - String snap3 = uniqueObjectName("snap"); - writeClient.createSnapshot(volumeName, bucketName, snap3); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 3, metadataManager); - checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false); + createConfigForSnapshot(testDir); + createSubject(); + } + @AfterEach + void resume() { keyDeletingService.resume(); + } - try (ReferenceCounted rcOmSnapshot = - om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snap3)) { - OmSnapshot snapshot3 = rcOmSnapshot.get(); - - Table snap3deletedTable = - snapshot3.getMetadataManager().getDeletedTable(); - - // 5 keys can be deep cleaned as it was stuck previously - assertTableRowCount(snap3deletedTable, initialDeletedCount + 10, metadataManager); - - writeClient.deleteSnapshot(volumeName, bucketName, snap2); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); - - assertTableRowCount(snap3deletedTable, initialDeletedCount, metadataManager); - assertTableRowCount(deletedTable, initialDeletedCount, metadataManager); - checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true); + @AfterAll + void cleanup() { + if (om.stop()) { + om.join(); } } @@ -601,7 +555,7 @@ void testSnapshotExclusiveSize() throws Exception { // Check if the exclusive size is set. try (TableIterator> - iterator = snapshotInfoTable.iterator()) { + iterator = snapshotInfoTable.iterator()) { while (iterator.hasNext()) { Table.KeyValue snapshotEntry = iterator.next(); String snapshotName = snapshotEntry.getValue().getName(); @@ -615,6 +569,99 @@ void testSnapshotExclusiveSize() throws Exception { } } } + + /* + * Create Snap1 + * Create 10 keys + * Create Snap2 + * Delete 10 keys + * Create 5 keys + * Delete 5 keys -> but stop KeyDeletingService so + that keys won't be reclaimed. + * Create snap3 + * Now wait for snap3 to be deepCleaned -> Deleted 5 + keys should be deep cleaned. + * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable + of Snap3 should be empty. + */ + @Test + void testSnapshotDeepClean() throws Exception { + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 10); + Table snapshotInfoTable = + om.getMetadataManager().getSnapshotInfoTable(); + Table deletedTable = + om.getMetadataManager().getDeletedTable(); + Table keyTable = + om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT); + + // Suspend KeyDeletingService + keyDeletingService.suspend(); + + final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); + final long initialKeyCount = metadataManager.countRowsInTable(keyTable); + final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); + + final String volumeName = getTestName(); + final String bucketName = uniqueObjectName("bucket"); + + // Create Volume and Buckets + createVolumeAndBucket(volumeName, bucketName, false); + + writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, metadataManager); + + List createdKeys = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + OmKeyArgs args = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + createdKeys.add(args); + } + assertTableRowCount(keyTable, initialKeyCount + 10, metadataManager); + + String snap2 = uniqueObjectName("snap"); + writeClient.createSnapshot(volumeName, bucketName, snap2); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); + + // Create 5 Keys + for (int i = 11; i <= 15; i++) { + OmKeyArgs args = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + createdKeys.add(args); + } + + // Delete all 15 keys. + for (int i = 0; i < 15; i++) { + writeClient.deleteKey(createdKeys.get(i)); + } + + assertTableRowCount(deletedTable, initialDeletedCount + 15, metadataManager); + + // Create Snap3, traps all the deleted keys. + String snap3 = uniqueObjectName("snap"); + writeClient.createSnapshot(volumeName, bucketName, snap3); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 3, metadataManager); + checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false); + + keyDeletingService.resume(); + + try (ReferenceCounted rcOmSnapshot = + om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snap3)) { + OmSnapshot snapshot3 = rcOmSnapshot.get(); + + Table snap3deletedTable = + snapshot3.getMetadataManager().getDeletedTable(); + + // 5 keys can be deep cleaned as it was stuck previously + assertTableRowCount(snap3deletedTable, initialDeletedCount + 10, metadataManager); + + writeClient.deleteSnapshot(volumeName, bucketName, snap2); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); + + assertTableRowCount(snap3deletedTable, initialDeletedCount, metadataManager); + assertTableRowCount(deletedTable, initialDeletedCount, metadataManager); + checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true); + } + } } /** @@ -651,21 +698,27 @@ void checkIfDeleteServiceWithFailingSCM() throws Exception { final int keyCount = 100; createAndDeleteKeys(keyCount, 1); + keyManager.getDeletingService().suspend(); GenericTestUtils.waitFor( () -> countKeysPendingDeletion() == initialCount + keyCount, 100, 2000); + + keyManager.getDeletingService().resume(); // Make sure that we have run the background thread 5 times more GenericTestUtils.waitFor( () -> getRunCount() >= initialRunCount + 5, 100, 10000); // Since SCM calls are failing, deletedKeyCount should be zero. + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); assertEquals(0, getDeletedKeyCount()); assertEquals(initialCount + keyCount, countKeysPendingDeletion()); } @Test void checkDeletionForEmptyKey() throws Exception { + keyManager.getDeletingService().suspend(); final int initialCount = countKeysPendingDeletion(); final long initialRunCount = getRunCount(); final int keyCount = 100; @@ -676,17 +729,22 @@ void checkDeletionForEmptyKey() throws Exception { GenericTestUtils.waitFor( () -> countKeysPendingDeletion() == initialCount + keyCount, 100, 2000); + keyManager.getDeletingService().resume(); // Make sure that we have run the background thread 2 times or more GenericTestUtils.waitFor( () -> getRunCount() >= initialRunCount + 2, 100, 1000); // the blockClient is set to fail the deletion of key blocks, hence no keys // will be deleted + keyManager.getDeletingService().suspend(); assertEquals(0, getDeletedKeyCount()); + keyManager.getDeletingService().resume(); } @Test void checkDeletionForPartiallyCommitKey() throws Exception { + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); final String volumeName = getTestName(); final String bucketName = uniqueObjectName("bucket"); final String keyName = uniqueObjectName("key"); @@ -886,8 +944,9 @@ private long getRunCount() { private int countKeysPendingDeletion() { try { final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, - keyDeletingService.getDeletedKeySupplier()).getKeyBlocksList().size(); - LOG.debug("KeyManager keys pending deletion: {}", count); + keyManager.getDeletingService().getDeletedKeySupplier()) + .getKeyBlocksList().size(); + LOG.info("KeyManager keys pending deletion: {}", count); return count; } catch (IOException e) { throw new UncheckedIOException(e); From 15f77967b4d3d3e3072670254ff3b2ecaec57a75 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 14 Jan 2025 01:11:12 +0530 Subject: [PATCH 7/8] Fixed TestSnapshotDirectoryCleaningService. --- .../ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java index 3be0725a0093..b35b6980d4b4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java @@ -58,6 +58,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -84,6 +85,7 @@ public static void init() throws Exception { conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 2500, TimeUnit.MILLISECONDS); conf.setBoolean(OZONE_ACL_ENABLED, true); + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 1); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .build(); From cb54d63e261f16eeebbe1e6b0ad1e7a37e6fddd4 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Thu, 23 Jan 2025 11:59:50 +0530 Subject: [PATCH 8/8] Fixed TestKeyDeletingService. --- .../om/service/TestKeyDeletingService.java | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 84a03aa578f3..7877a8fde44a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -59,12 +59,8 @@ import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.ozone.test.OzoneTestBase; import org.apache.ratis.util.ExitUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.TestInstance; +import org.checkerframework.common.util.report.qual.ReportCreation; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; @@ -101,9 +97,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - /** * Test Key Deleting Service. *

@@ -693,12 +686,14 @@ void cleanup() { @Test void checkIfDeleteServiceWithFailingSCM() throws Exception { + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); final int initialCount = countKeysPendingDeletion(); final long initialRunCount = getRunCount(); final int keyCount = 100; createAndDeleteKeys(keyCount, 1); - keyManager.getDeletingService().suspend(); + GenericTestUtils.waitFor( () -> countKeysPendingDeletion() == initialCount + keyCount, @@ -727,18 +722,17 @@ void checkDeletionForEmptyKey() throws Exception { // the pre-allocated blocks are not committed, hence they will be deleted. GenericTestUtils.waitFor( - () -> countKeysPendingDeletion() == initialCount + keyCount, - 100, 2000); + () -> countKeysPendingDeletion() == initialCount + keyCount, 100, + 2000); keyManager.getDeletingService().resume(); // Make sure that we have run the background thread 2 times or more - GenericTestUtils.waitFor( - () -> getRunCount() >= initialRunCount + 2, - 100, 1000); + GenericTestUtils.waitFor(() -> getRunCount() >= initialRunCount + 2, 100, + 1000); // the blockClient is set to fail the deletion of key blocks, hence no keys // will be deleted keyManager.getDeletingService().suspend(); assertEquals(0, getDeletedKeyCount()); - keyManager.getDeletingService().resume(); + } @Test