From 3e36cb33eefbe63c29b7f80bf5fccc3bbedba4f1 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Mon, 7 Jul 2025 22:16:12 +0530 Subject: [PATCH 1/5] HDDS-13213. KeyDeletingService should limit task size by both key count and serialized size. --- .../hadoop/ozone/om/TestKeyPurging.java | 9 +++++++- .../apache/hadoop/ozone/om/KeyManager.java | 4 ++-- .../hadoop/ozone/om/KeyManagerImpl.java | 21 +++++++++++++++---- .../ozone/om/service/KeyDeletingService.java | 18 +++++++++++----- .../om/service/TestKeyDeletingService.java | 20 ++++++++++++------ 5 files changed, 54 insertions(+), 18 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java index fa59754b67f2..399453d64335 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.TestDataUtil; @@ -56,6 +57,7 @@ public class TestKeyPurging { private static final int NUM_KEYS = 10; private static final int KEY_SIZE = 100; private OzoneClient client; + private int ratisLimit; @BeforeEach public void setup() throws Exception { @@ -74,6 +76,11 @@ public void setup() throws Exception { client = OzoneClientFactory.getRpcClient(conf); store = client.getObjectStore(); om = cluster.getOzoneManager(); + int limit = (int) conf.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + ratisLimit = (int) (limit * 0.9); } @AfterEach @@ -126,7 +133,7 @@ public void testKeysPurgingByKeyDeletingService() throws Exception { GenericTestUtils.waitFor( () -> { try { - return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList().isEmpty(); } catch (IOException e) { return false; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 7e76885c49bd..ec8520cdc782 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -124,7 +124,7 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey, * @throws IOException if an I/O error occurs while fetching the keys. */ PendingKeysDeletion getPendingDeletionKeys( - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisByteLimit) throws IOException; /** @@ -142,7 +142,7 @@ PendingKeysDeletion getPendingDeletionKeys( */ PendingKeysDeletion getPendingDeletionKeys( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisByteLimit) throws IOException; /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 3216948c55f6..6415caeb231a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -710,17 +710,18 @@ public ListKeysResult listKeys(String volumeName, String bucketName, @Override public PendingKeysDeletion getPendingDeletionKeys( - final CheckedFunction, Boolean, IOException> filter, final int count) - throws IOException { - return getPendingDeletionKeys(null, null, null, filter, count); + final CheckedFunction, Boolean, IOException> filter, final int count, + int ratisByteLimit) throws IOException { + return getPendingDeletionKeys(null, null, null, filter, count, ratisByteLimit); } @Override public PendingKeysDeletion getPendingDeletionKeys( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int count) throws IOException { + int count, int ratisByteLimit) throws IOException { List keyBlocksList = Lists.newArrayList(); + long serializedSize = 0; Map keysToModify = new HashMap<>(); // Bucket prefix would be empty if volume is empty i.e. either null or "". Optional bucketPrefix = getBucketPrefix(volume, bucket, false); @@ -741,6 +742,7 @@ public PendingKeysDeletion getPendingDeletionKeys( List blockGroupList = Lists.newArrayList(); // Multiple keys with the same path can be queued in one DB entry RepeatedOmKeyInfo infoList = kv.getValue(); + boolean flag = false; for (OmKeyInfo info : infoList.getOmKeyInfoList()) { // Skip the key if the filter doesn't allow the file to be deleted. @@ -750,12 +752,23 @@ public PendingKeysDeletion getPendingDeletionKeys( .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))).collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder().setKeyName(kv.getKey()) .addAllBlockIDs(blockIDS).build(); + serializedSize += keyBlocks.getProto().getSerializedSize(); + if (serializedSize > ratisByteLimit) { + flag = true; + LOG.info( + "Total size of cumulative keys in a cycle crossed 90% ratis limit, serialized size: {}", + serializedSize); + break; + } blockGroupList.add(keyBlocks); currentCount++; } else { notReclaimableKeyInfo.addOmKeyInfo(info); } } + if (flag) { + break; + } List notReclaimableKeyInfoList = notReclaimableKeyInfo.getOmKeyInfoList(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 781184b86291..9d963c834cd1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -88,6 +89,7 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final AtomicLong deletedKeyCount; private final boolean deepCleanSnapshots; private final SnapshotChainManager snapshotChainManager; + private int ratisByteLimit; public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, long serviceInterval, @@ -104,6 +106,12 @@ public KeyDeletingService(OzoneManager ozoneManager, this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.scmClient = scmClient; + int limit = (int) ozoneManager.getConfiguration().getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // always go to 90% of max limit for request as other header will be added + this.ratisByteLimit = (int) (limit * 0.9); } /** @@ -316,7 +324,7 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ * @param keyManager KeyManager of the underlying store. */ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, - int remainNum) throws IOException { + int remainNum, int ratisLimit) throws IOException { String volume = null, bucket = null, snapshotTableKey = null; if (currentSnapshotInfo != null) { volume = currentSnapshotInfo.getVolumeName(); @@ -355,9 +363,9 @@ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyMan remainNum -= renamedTableEntries.size(); // Get pending keys that can be deleted - PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null - ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum) - : keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum); + PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null ? + keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum, ratisByteLimit) : + keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum, ratisLimit); List keyBlocksList = pendingKeysDeletion.getKeyBlocksList(); //submit purge requests if there are renamed entries to be purged or keys to be purged. if (!renamedTableEntries.isEmpty() || keyBlocksList != null && !keyBlocksList.isEmpty()) { @@ -449,7 +457,7 @@ public BackgroundTaskResult call() { snapInfo.getName())) { KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() : omSnapshot.get().getKeyManager(); - processDeletedKeysForStore(snapInfo, keyManager, remainNum); + processDeletedKeysForStore(snapInfo, keyManager, remainNum, ratisByteLimit); } } catch (IOException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index bbb2938b1867..ba6f644c4967 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -73,6 +74,7 @@ import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.KeyManagerImpl; +import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -142,6 +144,7 @@ class TestKeyDeletingService extends OzoneTestBase { private KeyDeletingService keyDeletingService; private DirectoryDeletingService directoryDeletingService; private ScmBlockLocationTestingClient scmBlockTestingClient; + private int ratisLimit; @BeforeAll void setup() { @@ -175,6 +178,11 @@ private void createSubject() throws Exception { writeClient = omTestManagers.getWriteClient(); om = omTestManagers.getOzoneManager(); metadataManager = omTestManagers.getMetadataManager(); + int limit = (int) conf.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + ratisLimit = (int) (limit * 0.9); } /** @@ -226,7 +234,7 @@ void checkIfDeleteServiceIsDeletingKeys() assertThat(getRunCount()).isGreaterThan(initialRunCount); assertThat(keyManager.getPendingDeletionKeys(new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), null, - keyManager, om.getMetadataManager().getLock()), Integer.MAX_VALUE).getKeyBlocksList()) + keyManager, om.getMetadataManager().getLock()), Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); } @@ -255,7 +263,7 @@ void checkDeletionForKeysWithMultipleVersions() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE).getKeyBlocksList()) + assertThat(keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); // The 1st version of the key has 1 block and the 2nd version has 2 @@ -300,7 +308,7 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception { assertThat(keyManager.getPendingDeletionKeys(new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), null, keyManager, om.getMetadataManager().getLock()), - Integer.MAX_VALUE).getKeyBlocksList()) + Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); // deletedTable should have deleted key of the snapshot bucket @@ -408,7 +416,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() Assertions.assertNotEquals(deletePathKey[0], group.getGroupID()); } return pendingKeysDeletion; - }).when(km).getPendingDeletionKeys(any(), anyInt()); + }).when(km).getPendingDeletionKeys(any(), anyInt(), anyInt()); service.runPeriodicalTaskNow(); service.runPeriodicalTaskNow(); assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); @@ -1052,7 +1060,7 @@ private long getRunCount() { private int countKeysPendingDeletion() { try { - final int count = keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + final int count = keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList().size(); LOG.debug("KeyManager keys pending deletion: {}", count); return count; @@ -1063,7 +1071,7 @@ private int countKeysPendingDeletion() { private long countBlocksPendingDeletion() { try { - return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList() .stream() .map(BlockGroup::getBlockIDList) From 6ecc9978389d4148387bac6540f8dfe1148db320 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Mon, 7 Jul 2025 22:29:48 +0530 Subject: [PATCH 2/5] Directory Deleting Service shouldn't continue if PurgePaths requests fail. --- .../hadoop/ozone/om/service/DirectoryDeletingService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index f3fdebedd7f8..bd043c7346a2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -272,8 +272,9 @@ void optimizeDirDeletesAndSubmitRequest( break; } } - if (!purgePathRequestList.isEmpty()) { - submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId); + if (purgePathRequestList.isEmpty() || + submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId) == null) { + return; } if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) { From 05540d42da7c547c1f544c920c443fdca51f71b8 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 8 Jul 2025 13:27:51 +0530 Subject: [PATCH 3/5] Handled Snapshot case. --- .../hdds/utils/MapBackedTableIterator.java | 10 ++++---- .../apache/hadoop/ozone/om/KeyManager.java | 4 ++-- .../hadoop/ozone/om/KeyManagerImpl.java | 19 ++++++++++----- .../ozone/om/service/KeyDeletingService.java | 18 +++++++++++---- .../om/service/SnapshotDeletingService.java | 7 +++--- .../hadoop/ozone/om/TestKeyManagerImpl.java | 23 +++++++++++++++---- 6 files changed, 57 insertions(+), 24 deletions(-) diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java index 5af0e671d51b..6474bc8e52de 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -40,8 +41,9 @@ public MapBackedTableIterator(TreeMap values, String prefix) { @Override public void seekToFirst() { this.itr = this.values.entrySet().stream() - .filter(e -> prefix == null || e.getKey().startsWith(prefix)) - .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + .filter(e -> prefix == null || e.getKey().startsWith(prefix)).map( + e -> Table.newKeyValue(e.getKey(), e.getValue(), + e.getValue().toString().getBytes(StandardCharsets.UTF_8).length)).iterator(); } @Override @@ -53,8 +55,8 @@ public void seekToLast() { public Table.KeyValue seek(String s) { this.itr = this.values.entrySet().stream() .filter(e -> prefix == null || e.getKey().startsWith(prefix)) - .filter(e -> e.getKey().compareTo(s) >= 0) - .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + .filter(e -> e.getKey().compareTo(s) >= 0).map(e -> Table.newKeyValue(e.getKey(), e.getValue(), + e.getValue().toString().getBytes(StandardCharsets.UTF_8).length)).iterator(); Map.Entry firstEntry = values.ceilingEntry(s); return firstEntry == null ? null : Table.newKeyValue(firstEntry.getKey(), firstEntry.getValue()); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index ec8520cdc782..d69839a1c8db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -156,7 +156,7 @@ PendingKeysDeletion getPendingDeletionKeys( */ List> getRenamesKeyEntries( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisLimit) throws IOException; @@ -190,7 +190,7 @@ CheckedFunction getPreviousSnapshotOzoneKeyI List>> getDeletedKeyEntries( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int count) throws IOException; + int count, int ratisLimit) throws IOException; /** * Returns the names of up to {@code count} open keys whose age is diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 6415caeb231a..b33fc4d6c485 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -788,8 +788,9 @@ private List> getTableEntries(String startKey, TableIterator> tableIterator, Function valueFunction, CheckedFunction, Boolean, IOException> filter, - int size) throws IOException { + int size, int ratisLimit) throws IOException { List> entries = new ArrayList<>(); + int consumedSize = 0; /* Seek to the start key if it's not null. The next key in queue is ensured to start with the bucket prefix, {@link org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this. */ @@ -801,9 +802,14 @@ private List> getTableEntries(String startKey, int currentCount = 0; while (tableIterator.hasNext() && currentCount < size) { KeyValue kv = tableIterator.next(); + consumedSize += kv.getValueByteSize(); if (kv != null && filter.apply(kv)) { - entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()))); + entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()), kv.getValueByteSize())); currentCount++; + if (consumedSize > ratisLimit) { + LOG.info("Serialized size exceeded the ratis limit, current serailized size : {}", consumedSize); + break; + } } } return entries; @@ -824,11 +830,12 @@ private Optional getBucketPrefix(String volumeName, String bucketName, b @Override public List> getRenamesKeyEntries( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int size) throws IOException { + CheckedFunction, Boolean, IOException> filter, int size, int ratisLimit) + throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> renamedKeyIter = metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size); + return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size, ratisLimit); } } @@ -874,11 +881,11 @@ private CheckedFunction getPreviousSnapshotOzone public List>> getDeletedKeyEntries( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int size) throws IOException { + int size, int ratisLimit) throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> delKeyIter = metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size); + return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size, ratisLimit); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 9d963c834cd1..4b226745b8a5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -356,15 +356,23 @@ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyMan ReclaimableRenameEntryFilter renameEntryFilter = new ReclaimableRenameEntryFilter( getOzoneManager(), omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { - List renamedTableEntries = - keyManager.getRenamesKeyEntries(volume, bucket, null, renameEntryFilter, remainNum).stream() - .map(Table.KeyValue::getKey) - .collect(Collectors.toList()); + List> renameKeyEntries = + keyManager.getRenamesKeyEntries(volume, bucket, null, renameEntryFilter, remainNum, ratisLimit); + + List renamedTableEntries = new ArrayList<>(renameKeyEntries.size()); + int serializedSize = 0; + + for (Table.KeyValue kv : renameKeyEntries) { + renamedTableEntries.add(kv.getKey()); + serializedSize += kv.getValueByteSize(); + } + remainNum -= renamedTableEntries.size(); + ratisLimit -= serializedSize; // Get pending keys that can be deleted PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null ? - keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum, ratisByteLimit) : + keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum, ratisLimit) : keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum, ratisLimit); List keyBlocksList = pendingKeysDeletion.getKeyBlocksList(); //submit purge requests if there are renamed entries to be purged or keys to be purged. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 75e9a20cdf12..1b3199ec9877 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -177,15 +177,16 @@ public BackgroundTaskResult call() throws InterruptedException { // Get all entries from deletedKeyTable. List>> deletedKeyEntries = snapshotKeyManager.getDeletedKeyEntries(snapInfo.getVolumeName(), snapInfo.getBucketName(), - null, (kv) -> true, remaining); + null, (kv) -> true, remaining, ratisByteLimit); moveCount += deletedKeyEntries.size(); // Get all entries from deletedDirTable. List> deletedDirEntries = snapshotKeyManager.getDeletedDirEntries( snapInfo.getVolumeName(), snapInfo.getBucketName(), remaining - moveCount); moveCount += deletedDirEntries.size(); // Get all entries from snapshotRenamedTable. - List> renameEntries = snapshotKeyManager.getRenamesKeyEntries( - snapInfo.getVolumeName(), snapInfo.getBucketName(), null, (kv) -> true, remaining - moveCount); + List> renameEntries = + snapshotKeyManager.getRenamesKeyEntries(snapInfo.getVolumeName(), snapInfo.getBucketName(), null, + (kv) -> true, remaining - moveCount, ratisByteLimit); moveCount += renameEntries.size(); if (moveCount > 0) { List deletedKeys = new ArrayList<>(deletedKeyEntries.size()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index d021cc752507..447be08cafc2 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.MapBackedTableIterator; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -121,6 +122,11 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer String bucketNamePrefix = "bucket"; String keyPrefix = "key"; OzoneConfiguration configuration = new OzoneConfiguration(); + int limit = (int) configuration.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + limit = (int) (limit * 0.9); OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class); when(metadataManager.getBucketKeyPrefix(anyString(), anyString())).thenAnswer(i -> "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/"); @@ -146,10 +152,12 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { + int finalLimit = limit; assertThrows(expectedException, () -> km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, - numberOfEntries)); + numberOfEntries, finalLimit)); } else { - assertEquals(expectedEntries, km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); + assertEquals(expectedEntries, + km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries, limit)); } } @@ -165,6 +173,11 @@ public void testGetRenameKeyEntries(int numberOfVolumes, int numberOfBucketsPerV String bucketNamePrefix = "bucket"; String keyPrefix = ""; OzoneConfiguration configuration = new OzoneConfiguration(); + int limit = (int) configuration.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + limit = (int) (limit * 0.9); OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class); when(metadataManager.getBucketKeyPrefix(anyString(), anyString())).thenAnswer(i -> "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/"); @@ -183,10 +196,12 @@ public void testGetRenameKeyEntries(int numberOfVolumes, int numberOfBucketsPerV : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { + int finalLimit = limit; assertThrows(expectedException, () -> km.getRenamesKeyEntries(volumeName, bucketName, startKey, - filter, numberOfEntries)); + filter, numberOfEntries, finalLimit)); } else { - assertEquals(expectedEntries, km.getRenamesKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); + assertEquals(expectedEntries, + km.getRenamesKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries, limit)); } } From 5bc2f4f67bd99820fa8905e988642e70dec4931a Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 8 Jul 2025 13:42:21 +0530 Subject: [PATCH 4/5] Fixed findBug. --- .../main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index b33fc4d6c485..235a308113cd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -802,8 +802,8 @@ private List> getTableEntries(String startKey, int currentCount = 0; while (tableIterator.hasNext() && currentCount < size) { KeyValue kv = tableIterator.next(); - consumedSize += kv.getValueByteSize(); if (kv != null && filter.apply(kv)) { + consumedSize += kv.getValueByteSize(); entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()), kv.getValueByteSize())); currentCount++; if (consumedSize > ratisLimit) { From 6b7f520cf24da3f1cef7699a5627799a46ba1f4f Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Wed, 9 Jul 2025 20:20:40 +0530 Subject: [PATCH 5/5] Addressed comments. --- .../hdds/utils/MapBackedTableIterator.java | 10 +++++++--- .../apache/hadoop/ozone/om/KeyManagerImpl.java | 18 +++++++++++------- .../ozone/om/service/KeyDeletingService.java | 3 ++- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java index 6474bc8e52de..d3fbb3cb9f21 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java @@ -41,9 +41,13 @@ public MapBackedTableIterator(TreeMap values, String prefix) { @Override public void seekToFirst() { this.itr = this.values.entrySet().stream() - .filter(e -> prefix == null || e.getKey().startsWith(prefix)).map( - e -> Table.newKeyValue(e.getKey(), e.getValue(), - e.getValue().toString().getBytes(StandardCharsets.UTF_8).length)).iterator(); + .filter(e -> prefix == null || e.getKey().startsWith(prefix)) + .map(e -> { + V value = e.getValue(); + int size = value != null ? value.toString().getBytes(StandardCharsets.UTF_8).length : 0; + return Table.newKeyValue(e.getKey(), value, size); + }) + .iterator(); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 235a308113cd..bd186df16843 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -735,6 +735,7 @@ public PendingKeysDeletion getPendingDeletionKeys( delKeyIter.seek(startKey); } int currentCount = 0; + boolean maxReqSizeExceeded = false; while (delKeyIter.hasNext() && currentCount < count) { RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); KeyValue kv = delKeyIter.next(); @@ -742,7 +743,6 @@ public PendingKeysDeletion getPendingDeletionKeys( List blockGroupList = Lists.newArrayList(); // Multiple keys with the same path can be queued in one DB entry RepeatedOmKeyInfo infoList = kv.getValue(); - boolean flag = false; for (OmKeyInfo info : infoList.getOmKeyInfoList()) { // Skip the key if the filter doesn't allow the file to be deleted. @@ -752,12 +752,16 @@ public PendingKeysDeletion getPendingDeletionKeys( .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))).collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder().setKeyName(kv.getKey()) .addAllBlockIDs(blockIDS).build(); - serializedSize += keyBlocks.getProto().getSerializedSize(); + int keyBlockSerializedSize = keyBlocks.getProto().getSerializedSize(); + serializedSize += keyBlockSerializedSize; if (serializedSize > ratisByteLimit) { - flag = true; - LOG.info( - "Total size of cumulative keys in a cycle crossed 90% ratis limit, serialized size: {}", - serializedSize); + maxReqSizeExceeded = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Total size of cumulative keys and rename entries in the snapshotRenamedTable in a cycle " + + "crossed 90% ratis limit, serialized size of keys: {}", + serializedSize); + } break; } blockGroupList.add(keyBlocks); @@ -766,7 +770,7 @@ public PendingKeysDeletion getPendingDeletionKeys( notReclaimableKeyInfo.addOmKeyInfo(info); } } - if (flag) { + if (maxReqSizeExceeded) { break; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 4b226745b8a5..15a620a8e492 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -90,6 +90,7 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final boolean deepCleanSnapshots; private final SnapshotChainManager snapshotChainManager; private int ratisByteLimit; + private static final double RATIS_LIMIT_FACTOR = 0.9; public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, long serviceInterval, @@ -111,7 +112,7 @@ public KeyDeletingService(OzoneManager ozoneManager, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); // always go to 90% of max limit for request as other header will be added - this.ratisByteLimit = (int) (limit * 0.9); + this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR); } /**