diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java index 5af0e671d51b..d3fbb3cb9f21 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/MapBackedTableIterator.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -41,7 +42,12 @@ public MapBackedTableIterator(TreeMap values, String prefix) { public void seekToFirst() { this.itr = this.values.entrySet().stream() .filter(e -> prefix == null || e.getKey().startsWith(prefix)) - .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + .map(e -> { + V value = e.getValue(); + int size = value != null ? value.toString().getBytes(StandardCharsets.UTF_8).length : 0; + return Table.newKeyValue(e.getKey(), value, size); + }) + .iterator(); } @Override @@ -53,8 +59,8 @@ public void seekToLast() { public Table.KeyValue seek(String s) { this.itr = this.values.entrySet().stream() .filter(e -> prefix == null || e.getKey().startsWith(prefix)) - .filter(e -> e.getKey().compareTo(s) >= 0) - .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + .filter(e -> e.getKey().compareTo(s) >= 0).map(e -> Table.newKeyValue(e.getKey(), e.getValue(), + e.getValue().toString().getBytes(StandardCharsets.UTF_8).length)).iterator(); Map.Entry firstEntry = values.ceilingEntry(s); return firstEntry == null ? null : Table.newKeyValue(firstEntry.getKey(), firstEntry.getValue()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java index fa59754b67f2..399453d64335 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.TestDataUtil; @@ -56,6 +57,7 @@ public class TestKeyPurging { private static final int NUM_KEYS = 10; private static final int KEY_SIZE = 100; private OzoneClient client; + private int ratisLimit; @BeforeEach public void setup() throws Exception { @@ -74,6 +76,11 @@ public void setup() throws Exception { client = OzoneClientFactory.getRpcClient(conf); store = client.getObjectStore(); om = cluster.getOzoneManager(); + int limit = (int) conf.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + ratisLimit = (int) (limit * 0.9); } @AfterEach @@ -126,7 +133,7 @@ public void testKeysPurgingByKeyDeletingService() throws Exception { GenericTestUtils.waitFor( () -> { try { - return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList().isEmpty(); } catch (IOException e) { return false; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 7e76885c49bd..d69839a1c8db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -124,7 +124,7 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey, * @throws IOException if an I/O error occurs while fetching the keys. */ PendingKeysDeletion getPendingDeletionKeys( - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisByteLimit) throws IOException; /** @@ -142,7 +142,7 @@ PendingKeysDeletion getPendingDeletionKeys( */ PendingKeysDeletion getPendingDeletionKeys( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisByteLimit) throws IOException; /** @@ -156,7 +156,7 @@ PendingKeysDeletion getPendingDeletionKeys( */ List> getRenamesKeyEntries( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int count) + CheckedFunction, Boolean, IOException> filter, int count, int ratisLimit) throws IOException; @@ -190,7 +190,7 @@ CheckedFunction getPreviousSnapshotOzoneKeyI List>> getDeletedKeyEntries( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int count) throws IOException; + int count, int ratisLimit) throws IOException; /** * Returns the names of up to {@code count} open keys whose age is diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 3216948c55f6..bd186df16843 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -710,17 +710,18 @@ public ListKeysResult listKeys(String volumeName, String bucketName, @Override public PendingKeysDeletion getPendingDeletionKeys( - final CheckedFunction, Boolean, IOException> filter, final int count) - throws IOException { - return getPendingDeletionKeys(null, null, null, filter, count); + final CheckedFunction, Boolean, IOException> filter, final int count, + int ratisByteLimit) throws IOException { + return getPendingDeletionKeys(null, null, null, filter, count, ratisByteLimit); } @Override public PendingKeysDeletion getPendingDeletionKeys( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int count) throws IOException { + int count, int ratisByteLimit) throws IOException { List keyBlocksList = Lists.newArrayList(); + long serializedSize = 0; Map keysToModify = new HashMap<>(); // Bucket prefix would be empty if volume is empty i.e. either null or "". Optional bucketPrefix = getBucketPrefix(volume, bucket, false); @@ -734,6 +735,7 @@ public PendingKeysDeletion getPendingDeletionKeys( delKeyIter.seek(startKey); } int currentCount = 0; + boolean maxReqSizeExceeded = false; while (delKeyIter.hasNext() && currentCount < count) { RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); KeyValue kv = delKeyIter.next(); @@ -750,12 +752,27 @@ public PendingKeysDeletion getPendingDeletionKeys( .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))).collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder().setKeyName(kv.getKey()) .addAllBlockIDs(blockIDS).build(); + int keyBlockSerializedSize = keyBlocks.getProto().getSerializedSize(); + serializedSize += keyBlockSerializedSize; + if (serializedSize > ratisByteLimit) { + maxReqSizeExceeded = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Total size of cumulative keys and rename entries in the snapshotRenamedTable in a cycle " + + "crossed 90% ratis limit, serialized size of keys: {}", + serializedSize); + } + break; + } blockGroupList.add(keyBlocks); currentCount++; } else { notReclaimableKeyInfo.addOmKeyInfo(info); } } + if (maxReqSizeExceeded) { + break; + } List notReclaimableKeyInfoList = notReclaimableKeyInfo.getOmKeyInfoList(); @@ -775,8 +792,9 @@ private List> getTableEntries(String startKey, TableIterator> tableIterator, Function valueFunction, CheckedFunction, Boolean, IOException> filter, - int size) throws IOException { + int size, int ratisLimit) throws IOException { List> entries = new ArrayList<>(); + int consumedSize = 0; /* Seek to the start key if it's not null. The next key in queue is ensured to start with the bucket prefix, {@link org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this. */ @@ -789,8 +807,13 @@ private List> getTableEntries(String startKey, while (tableIterator.hasNext() && currentCount < size) { KeyValue kv = tableIterator.next(); if (kv != null && filter.apply(kv)) { - entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()))); + consumedSize += kv.getValueByteSize(); + entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()), kv.getValueByteSize())); currentCount++; + if (consumedSize > ratisLimit) { + LOG.info("Serialized size exceeded the ratis limit, current serailized size : {}", consumedSize); + break; + } } } return entries; @@ -811,11 +834,12 @@ private Optional getBucketPrefix(String volumeName, String bucketName, b @Override public List> getRenamesKeyEntries( String volume, String bucket, String startKey, - CheckedFunction, Boolean, IOException> filter, int size) throws IOException { + CheckedFunction, Boolean, IOException> filter, int size, int ratisLimit) + throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> renamedKeyIter = metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size); + return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size, ratisLimit); } } @@ -861,11 +885,11 @@ private CheckedFunction getPreviousSnapshotOzone public List>> getDeletedKeyEntries( String volume, String bucket, String startKey, CheckedFunction, Boolean, IOException> filter, - int size) throws IOException { + int size, int ratisLimit) throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> delKeyIter = metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size); + return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size, ratisLimit); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index f3fdebedd7f8..bd043c7346a2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -272,8 +272,9 @@ void optimizeDirDeletesAndSubmitRequest( break; } } - if (!purgePathRequestList.isEmpty()) { - submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId); + if (purgePathRequestList.isEmpty() || + submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId) == null) { + return; } if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 781184b86291..15a620a8e492 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -88,6 +89,8 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final AtomicLong deletedKeyCount; private final boolean deepCleanSnapshots; private final SnapshotChainManager snapshotChainManager; + private int ratisByteLimit; + private static final double RATIS_LIMIT_FACTOR = 0.9; public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, long serviceInterval, @@ -104,6 +107,12 @@ public KeyDeletingService(OzoneManager ozoneManager, this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.scmClient = scmClient; + int limit = (int) ozoneManager.getConfiguration().getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // always go to 90% of max limit for request as other header will be added + this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR); } /** @@ -316,7 +325,7 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ * @param keyManager KeyManager of the underlying store. */ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, - int remainNum) throws IOException { + int remainNum, int ratisLimit) throws IOException { String volume = null, bucket = null, snapshotTableKey = null; if (currentSnapshotInfo != null) { volume = currentSnapshotInfo.getVolumeName(); @@ -348,16 +357,24 @@ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyMan ReclaimableRenameEntryFilter renameEntryFilter = new ReclaimableRenameEntryFilter( getOzoneManager(), omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { - List renamedTableEntries = - keyManager.getRenamesKeyEntries(volume, bucket, null, renameEntryFilter, remainNum).stream() - .map(Table.KeyValue::getKey) - .collect(Collectors.toList()); + List> renameKeyEntries = + keyManager.getRenamesKeyEntries(volume, bucket, null, renameEntryFilter, remainNum, ratisLimit); + + List renamedTableEntries = new ArrayList<>(renameKeyEntries.size()); + int serializedSize = 0; + + for (Table.KeyValue kv : renameKeyEntries) { + renamedTableEntries.add(kv.getKey()); + serializedSize += kv.getValueByteSize(); + } + remainNum -= renamedTableEntries.size(); + ratisLimit -= serializedSize; // Get pending keys that can be deleted - PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null - ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum) - : keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum); + PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null ? + keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum, ratisLimit) : + keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum, ratisLimit); List keyBlocksList = pendingKeysDeletion.getKeyBlocksList(); //submit purge requests if there are renamed entries to be purged or keys to be purged. if (!renamedTableEntries.isEmpty() || keyBlocksList != null && !keyBlocksList.isEmpty()) { @@ -449,7 +466,7 @@ public BackgroundTaskResult call() { snapInfo.getName())) { KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() : omSnapshot.get().getKeyManager(); - processDeletedKeysForStore(snapInfo, keyManager, remainNum); + processDeletedKeysForStore(snapInfo, keyManager, remainNum, ratisByteLimit); } } catch (IOException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 75e9a20cdf12..1b3199ec9877 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -177,15 +177,16 @@ public BackgroundTaskResult call() throws InterruptedException { // Get all entries from deletedKeyTable. List>> deletedKeyEntries = snapshotKeyManager.getDeletedKeyEntries(snapInfo.getVolumeName(), snapInfo.getBucketName(), - null, (kv) -> true, remaining); + null, (kv) -> true, remaining, ratisByteLimit); moveCount += deletedKeyEntries.size(); // Get all entries from deletedDirTable. List> deletedDirEntries = snapshotKeyManager.getDeletedDirEntries( snapInfo.getVolumeName(), snapInfo.getBucketName(), remaining - moveCount); moveCount += deletedDirEntries.size(); // Get all entries from snapshotRenamedTable. - List> renameEntries = snapshotKeyManager.getRenamesKeyEntries( - snapInfo.getVolumeName(), snapInfo.getBucketName(), null, (kv) -> true, remaining - moveCount); + List> renameEntries = + snapshotKeyManager.getRenamesKeyEntries(snapInfo.getVolumeName(), snapInfo.getBucketName(), null, + (kv) -> true, remaining - moveCount, ratisByteLimit); moveCount += renameEntries.size(); if (moveCount > 0) { List deletedKeys = new ArrayList<>(deletedKeyEntries.size()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index d021cc752507..447be08cafc2 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.MapBackedTableIterator; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -121,6 +122,11 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer String bucketNamePrefix = "bucket"; String keyPrefix = "key"; OzoneConfiguration configuration = new OzoneConfiguration(); + int limit = (int) configuration.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + limit = (int) (limit * 0.9); OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class); when(metadataManager.getBucketKeyPrefix(anyString(), anyString())).thenAnswer(i -> "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/"); @@ -146,10 +152,12 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { + int finalLimit = limit; assertThrows(expectedException, () -> km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, - numberOfEntries)); + numberOfEntries, finalLimit)); } else { - assertEquals(expectedEntries, km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); + assertEquals(expectedEntries, + km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries, limit)); } } @@ -165,6 +173,11 @@ public void testGetRenameKeyEntries(int numberOfVolumes, int numberOfBucketsPerV String bucketNamePrefix = "bucket"; String keyPrefix = ""; OzoneConfiguration configuration = new OzoneConfiguration(); + int limit = (int) configuration.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + limit = (int) (limit * 0.9); OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class); when(metadataManager.getBucketKeyPrefix(anyString(), anyString())).thenAnswer(i -> "/" + i.getArguments()[0] + "/" + i.getArguments()[1] + "/"); @@ -183,10 +196,12 @@ public void testGetRenameKeyEntries(int numberOfVolumes, int numberOfBucketsPerV : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { + int finalLimit = limit; assertThrows(expectedException, () -> km.getRenamesKeyEntries(volumeName, bucketName, startKey, - filter, numberOfEntries)); + filter, numberOfEntries, finalLimit)); } else { - assertEquals(expectedEntries, km.getRenamesKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); + assertEquals(expectedEntries, + km.getRenamesKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries, limit)); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index bbb2938b1867..ba6f644c4967 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -73,6 +74,7 @@ import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.KeyManagerImpl; +import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -142,6 +144,7 @@ class TestKeyDeletingService extends OzoneTestBase { private KeyDeletingService keyDeletingService; private DirectoryDeletingService directoryDeletingService; private ScmBlockLocationTestingClient scmBlockTestingClient; + private int ratisLimit; @BeforeAll void setup() { @@ -175,6 +178,11 @@ private void createSubject() throws Exception { writeClient = omTestManagers.getWriteClient(); om = omTestManagers.getOzoneManager(); metadataManager = omTestManagers.getMetadataManager(); + int limit = (int) conf.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + ratisLimit = (int) (limit * 0.9); } /** @@ -226,7 +234,7 @@ void checkIfDeleteServiceIsDeletingKeys() assertThat(getRunCount()).isGreaterThan(initialRunCount); assertThat(keyManager.getPendingDeletionKeys(new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), null, - keyManager, om.getMetadataManager().getLock()), Integer.MAX_VALUE).getKeyBlocksList()) + keyManager, om.getMetadataManager().getLock()), Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); } @@ -255,7 +263,7 @@ void checkDeletionForKeysWithMultipleVersions() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE).getKeyBlocksList()) + assertThat(keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); // The 1st version of the key has 1 block and the 2nd version has 2 @@ -300,7 +308,7 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception { assertThat(keyManager.getPendingDeletionKeys(new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), null, keyManager, om.getMetadataManager().getLock()), - Integer.MAX_VALUE).getKeyBlocksList()) + Integer.MAX_VALUE, ratisLimit).getKeyBlocksList()) .isEmpty(); // deletedTable should have deleted key of the snapshot bucket @@ -408,7 +416,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() Assertions.assertNotEquals(deletePathKey[0], group.getGroupID()); } return pendingKeysDeletion; - }).when(km).getPendingDeletionKeys(any(), anyInt()); + }).when(km).getPendingDeletionKeys(any(), anyInt(), anyInt()); service.runPeriodicalTaskNow(); service.runPeriodicalTaskNow(); assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); @@ -1052,7 +1060,7 @@ private long getRunCount() { private int countKeysPendingDeletion() { try { - final int count = keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + final int count = keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList().size(); LOG.debug("KeyManager keys pending deletion: {}", count); return count; @@ -1063,7 +1071,7 @@ private int countKeysPendingDeletion() { private long countBlocksPendingDeletion() { try { - return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE, ratisLimit) .getKeyBlocksList() .stream() .map(BlockGroup::getBlockIDList)