From 5bf11049f4ec7f890b0a2ce85b240c2b56f5f31f Mon Sep 17 00:00:00 2001 From: smdsbz Date: Mon, 20 Jan 2025 15:12:01 +0800 Subject: [PATCH 1/3] iterative list for oss --- .../paimon/oss/HadoopCompliantFileIO.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java index 67027eabadfb..8faa73d694c5 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.RemoteIterator; import org.apache.paimon.fs.SeekableInputStream; import org.apache.hadoop.fs.FSDataInputStream; @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException { return statuses; } + @Override + public RemoteIterator listFilesIterative(Path path, boolean recursive) + throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + org.apache.hadoop.fs.RemoteIterator hadoopIter = + getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return hadoopIter.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next(); + return new HadoopFileStatus(hadoopStatus); + } + + @Override + public void close() throws IOException {} + }; + } + @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); From 666ef877cb8325aa4305b817eda444bdad285211 Mon Sep 17 00:00:00 2001 From: smdsbz Date: Mon, 20 Jan 2025 19:33:26 +0800 Subject: [PATCH 2/3] iterative list for s3 and hadoop --- .../apache/paimon/fs/hadoop/HadoopFileIO.java | 24 +++++++++++++++++++ .../paimon/s3/HadoopCompliantFileIO.java | 24 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index 0a8d64a73b00..f17c587fe923 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.RemoteIterator; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.utils.FunctionWithException; @@ -104,6 +105,29 @@ public FileStatus[] listStatus(Path path) throws IOException { return statuses; } + @Override + public RemoteIterator listFilesIterative(Path path, boolean recursive) + throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + org.apache.hadoop.fs.RemoteIterator hadoopIter = + getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return hadoopIter.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next(); + return new HadoopFileStatus(hadoopStatus); + } + + @Override + public void close() throws IOException {} + }; + } + @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java index 80f3df582096..1bac9087cb43 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.RemoteIterator; import org.apache.paimon.fs.SeekableInputStream; import org.apache.hadoop.fs.FSDataInputStream; @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException { return statuses; } + @Override + public RemoteIterator listFilesIterative(Path path, boolean recursive) + throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + org.apache.hadoop.fs.RemoteIterator hadoopIter = + getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return hadoopIter.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next(); + return new HadoopFileStatus(hadoopStatus); + } + + @Override + public void close() throws IOException {} + }; + } + @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); From e865171d565297d13e8008f83ff9f4d2f329de5b Mon Sep 17 00:00:00 2001 From: smdsbz Date: Tue, 21 Jan 2025 11:22:56 +0800 Subject: [PATCH 3/3] test for HadoopFileIO's iterative list --- .../paimon/fs/FileIOBehaviorTestBase.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java index 106dee38e3fe..7ca28b937d6a 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; @@ -85,6 +87,43 @@ void testFileDoesNotExist() throws IOException { assertThat(fs.exists(new Path(basePath, randomName()))).isFalse(); } + // --- list files + + @Test + void testListFilesIterativeNonRecursive() throws IOException { + Path fileA = createRandomFileInDirectory(basePath); + Path dirB = new Path(basePath, randomName()); + fs.mkdirs(dirB); + Path fileBC = createRandomFileInDirectory(dirB); + + List allFiles = new ArrayList<>(); + try (RemoteIterator iter = fs.listFilesIterative(basePath, false)) { + while (iter.hasNext()) { + allFiles.add(iter.next()); + } + } + assertThat(allFiles.size()).isEqualTo(1); + assertThat(allFiles.get(0).getPath()).isEqualTo(fileA); + } + + @Test + void testListFilesIterativeRecursive() throws IOException { + Path fileA = createRandomFileInDirectory(basePath); + Path dirB = new Path(basePath, randomName()); + fs.mkdirs(dirB); + Path fileBC = createRandomFileInDirectory(dirB); + + List allFiles = new ArrayList<>(); + try (RemoteIterator iter = fs.listFilesIterative(basePath, true)) { + while (iter.hasNext()) { + allFiles.add(iter.next()); + } + } + assertThat(allFiles.size()).isEqualTo(2); + assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileA)).count()).isEqualTo(1); + assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileBC)).count()).isEqualTo(1); + } + // --- delete @Test