diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java index 118a4117b6be..452564a68582 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java @@ -61,7 +61,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; -import org.apache.ozone.test.tag.Flaky; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -1635,7 +1634,6 @@ public void testDeleteRootWithTrash() throws IOException { * 2.Verify that the key gets deleted by the trash emptier. */ @Test - @Flaky("HDDS-6645") public void testTrash() throws Exception { String testKeyName = "testKey2"; Path path = new Path(OZONE_URI_DELIMITER, testKeyName); @@ -1655,12 +1653,10 @@ public void testTrash() throws Exception { // Call moveToTrash. We can't call protected fs.rename() directly trash.moveToTrash(path); - // Added this assertion here and will be tested as part of testTrash - // test case which needs to be tested with separate mini cluster having - // emptier thread started with close match of timings of relevant - // assertion statements and corresponding trash and checkpoint interval. + Assert.assertTrue(o3fs.exists(userTrash)); - Assert.assertTrue(o3fs.exists(userTrashCurrent)); + Assert.assertTrue(o3fs.exists(trashPath) || o3fs.listStatus( + o3fs.listStatus(userTrash)[0].getPath()).length > 0); // Wait until the TrashEmptier purges the key GenericTestUtils.waitFor(() -> { @@ -1673,10 +1669,6 @@ public void testTrash() throws Exception { } }, 100, 120000); - // userTrash path will contain the checkpoint folder - FileStatus[] statusList = fs.listStatus(userTrash); - Assert.assertNotEquals(Arrays.toString(statusList), 0, statusList.length); - // wait for deletion of checkpoint dir GenericTestUtils.waitFor(() -> { try { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java index 6779c3378bcd..115a8c5f8e0a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java @@ -28,8 +28,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OFSPath; @@ -49,7 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import java.net.URI; @@ -59,6 +56,7 @@ import java.util.Map; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_O3TRASH_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; @@ -75,6 +73,8 @@ public class TrashOzoneFileSystem extends FileSystem { private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100; + private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000; + private final OzoneManager ozoneManager; private final String userName; @@ -167,9 +167,8 @@ public boolean rename(Path src, Path dst) throws IOException { equals(dstPath.getBucketName())); Preconditions.checkArgument(srcPath.getTrashRoot(). toString().equals(dstPath.getTrashRoot().toString())); - try (RenameIterator iterator = new RenameIterator(src, dst)) { - iterator.iterate(); - } + RenameIterator iterator = new RenameIterator(src, dst); + iterator.iterate(); return true; } @@ -198,9 +197,8 @@ public boolean delete(Path path, boolean b) throws IOException { if (bucket.getBucketLayout().isFileSystemOptimized()) { return deleteFSO(srcPath); } - try (DeleteIterator iterator = new DeleteIterator(path, true)) { - iterator.iterate(); - } + DeleteIterator iterator = new DeleteIterator(path, true); + iterator.iterate(); return true; } @@ -347,12 +345,11 @@ public boolean exists(Path f) throws IOException { } } - private abstract class OzoneListingIterator implements Closeable { + private abstract class OzoneListingIterator { private final Path path; private final FileStatus status; private String pathKey; - private TableIterator> - keyIterator; + private Iterator keyIterator; OzoneListingIterator(Path path) throws IOException { @@ -362,10 +359,30 @@ private abstract class OzoneListingIterator implements Closeable { if (status.isDirectory()) { this.pathKey = addTrailingSlashIfNeeded(pathKey); } - keyIterator = ozoneManager.getMetadataManager().getKeyIterator(); + OFSPath fsPath = new OFSPath(pathKey, + OzoneConfiguration.of(getConf())); + keyIterator = + getKeyIterator(fsPath.getVolumeName(), fsPath.getBucketName(), + fsPath.getKeyName()); } - /** + private Iterator getKeyIterator(String volumeName, + String bucketName, String keyName) throws IOException { + List keys = new ArrayList<>( + listKeys(volumeName, bucketName, "", keyName)); + String lastKey = keys.get(keys.size() - 1); + List nextBatchKeys = + listKeys(volumeName, bucketName, lastKey, keyName); + + while (!nextBatchKeys.isEmpty()) { + keys.addAll(nextBatchKeys); + lastKey = nextBatchKeys.get(nextBatchKeys.size() - 1); + nextBatchKeys = listKeys(volumeName, bucketName, lastKey, keyName); + } + return keys.iterator(); + } + + /** * The output of processKey determines if further iteration through the * keys should be done or not. * @@ -395,13 +412,10 @@ boolean iterate() throws IOException { String ofsPathprefix = ofsPath.getNonKeyPathNoPrefixDelim() + OZONE_URI_DELIMITER; while (keyIterator.hasNext()) { - Table.KeyValue< String, OmKeyInfo > kv = keyIterator.next(); - String keyPath = ofsPathprefix + kv.getValue().getKeyName(); + String keyName = keyIterator.next(); + String keyPath = ofsPathprefix + keyName; LOG.trace("iterating key path: {}", keyPath); - if (!kv.getValue().getKeyName().equals("") - && kv.getKey().startsWith("/" + pathKey)) { - keyPathList.add(keyPath); - } + keyPathList.add(keyPath); if (keyPathList.size() >= OZONE_FS_ITERATE_BATCH_SIZE) { if (!processKeyPath(keyPathList)) { return false; @@ -427,9 +441,16 @@ FileStatus getStatus() { return status; } - @Override - public void close() throws IOException { - keyIterator.close(); + /** + * Return a listKeys output with only a list of keyNames. + */ + List listKeys(String volumeName, String bucketName, String startKey, + String keyPrefix) throws IOException { + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + return metadataManager.listKeys(volumeName, bucketName, startKey, + keyPrefix, OZONE_MAX_LIST_KEYS_SIZE).getKeys().stream() + .map(OmKeyInfo::getKeyName) + .collect(Collectors.toList()); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java index aa44debd6f59..cb5e36dd62ae 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java @@ -322,17 +322,8 @@ public void run() { continue; } TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf, om); - Runnable task = () -> { - try { - om.getMetrics().incNumTrashRootsProcessed(); - trash.deleteCheckpoint(trashRoot.getPath(), false); - trash.createCheckpoint(trashRoot.getPath(), - new Date(Time.now())); - } catch (Exception e) { - om.getMetrics().incNumTrashFails(); - LOG.error("Unable to checkpoint:" + trashRoot.getPath(), e); - } - }; + Path trashRootPath = trashRoot.getPath(); + Runnable task = getEmptierTask(trashRootPath, trash, false); om.getMetrics().incNumTrashRootsEnqueued(); executor.submit(task); } @@ -357,6 +348,21 @@ public void run() { } } + private Runnable getEmptierTask(Path trashRootPath, TrashPolicyOzone trash, + boolean deleteImmediately) { + Runnable task = () -> { + try { + om.getMetrics().incNumTrashRootsProcessed(); + trash.deleteCheckpoint(trashRootPath, deleteImmediately); + trash.createCheckpoint(trashRootPath, new Date(Time.now())); + } catch (Exception e) { + om.getMetrics().incNumTrashFails(); + LOG.error("Unable to checkpoint:" + trashRootPath, e); + } + }; + return task; + } + private long ceiling(long time, long interval) { return floor(time, interval) + interval; }